Final Exam Fall 2025: Running

Version 1.0.0

All of the header information is important. Please read it..

Topics number of exercises: This problem builds on your knowledge of GPS data processing, geospatial analysis, SQL for sports analytics, and computer vision for route classification. It has 11 exercises numbered 0 to 10. There are 21 available points. However to earn 100% the threshold is 16 points. (Therefore once you hit 16 points you can stop. There is no extra credit for exceeding this threshold.)

Exercise ordering: Each exercise builds logically on previous exercises but you may solve them in any order. That is if you can't solve an exercise you can still move on and try the next one. Use this to your advantage as the exercises are not necessarily ordered in terms of difficulty. Higher point values generally indicate more difficult exercises.

Demo cells: Code cells starting with the comment ### Run Me!!! load results from prior exercises applied to the entire data set and use those to build demo inputs. These must be run for subsequent demos to work properly but they do not affect the test cells. The data loaded in these cells may be rather large (at least in terms of human readability). You are free to print or otherwise use Python to explore them but we may not print them in the starter code.

Debugging your code: Right before each exercise test cell there is a block of text explaining the variables available to you for debugging. You may use these to test your code and can print/display them as needed (careful when printing large objects you may want to print the head or chunks of rows at a time).

Exercise point breakdown:

  • Exercise 0 - : 1 point(s)

  • Exercise 1 - : 3 point(s)

  • Exercise 2 - : 2 point(s)

  • Exercise 3 - : 2 point(s)

  • Exercise 4 - : 2 point(s)

  • Exercise 5 - : 2 point(s)

  • Exercise 6 - : 2 point(s)

  • Exercise 7 - : 3 point(s)

  • Exercise 8 - : 1 point(s)

  • Exercise 9 - : 2 point(s)

  • Exercise 10 - : 1 point(s)

Final reminders:

  • Submit after every exercise
  • Review the generated grade report after you submit to see what errors were returned
  • Stay calm, skip problems as needed and take short breaks at your leisure
In [1]:
### Global imports
import dill
from cse6040_devkit import plugins, utils
from cse6040_devkit.training_wheels import run_with_timeout, suppress_stdout
import tracemalloc
from time import time
import re 
import pandas as pd
import pprint 
import sqlite3 
import numpy as np
from pprint import pprint

utils.add_from_file('handle_gdf', plugins)
cse6040_devkit.plugins

Task Overview

For this exam, you are given information about different running metrics. From that data, you will:

  • Perform Exploratory Analysis
    • Explore GPS Art by parsing, mapping, and calculating distances from a .gpx file route using latitudes and longitudes
    • Calculate other running related metrics, especially around heartrates
  • Build a model
    • Build parts of a Convoluational Neural Network to classify types of runs based on map images you plotted
  • Create a running route
    • Build a .gpx route by overlaying an image onto an undirected mapping network of nodes and edges representing potential running routes
In [2]:
conn = sqlite3.connect('resource/asnlib/publicdata/running.db')

Perform Exploratory Analysis

Exercise 0: (1 points)

explore_data__FREE

Example: we have defined explore_data__FREE as follows:

This is a free exercise!

Please run the test cell below to collect your FREE point!

The output will show the structure of the GPX file and the database tables which we will use for the following exercises.

In [3]:
### Solution - Exercise 0  
def explore_data__FREE(name)->list:
    if '.gpx' in name:
        with open(name) as f:
            lines=[line for line in f.readlines()]
        return'\n'.join(lines)
    else:
        return pd.read_sql_query(f'''SELECT * FROM {name} LIMIT 10''', conn)

### Demo function call
name="resource/asnlib/publicdata/shape_run_paris2.gpx"
print(f'===============\n{name}\n===============\n')
print(explore_data__FREE(name)[:1000])
name="df_heartrate"
print(f'\n\n===============\n{name}\n===============\n')
display(explore_data__FREE(name))
name="df_races"
print(f'\n\n===============\n{name}\n===============\n')
display(explore_data__FREE(name))
===============
resource/asnlib/publicdata/shape_run_paris2.gpx
===============

<?xml version="1.0" encoding="UTF-8"?>

<gpx xmlns="http://www.topografix.com/GPX/1/1" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.topografix.com/GPX/1/1 http://www.topografix.com/GPX/1/1/gpx.xsd" version="1.1" creator="gpx.py -- https://github.com/tkrajina/gpxpy">

  <trk>

    <trkseg>

      <trkpt lat="48.872435599999996" lon="2.3360880000000006">

      </trkpt>

      <trkpt lat="48.8722677" lon="2.3372904999999995">

      </trkpt>

      <trkpt lat="48.872473199999995" lon="2.3373655">

      </trkpt>

      <trkpt lat="48.8724855" lon="2.3374103000000006">

      </trkpt>

      <trkpt lat="48.87297929999999" lon="2.33758">

      </trkpt>

      <trkpt lat="48.87307280000001" lon="2.3376136">

      </trkpt>

      <trkpt lat="48.8730882" lon="2.3376350000000006">

      </trkpt>

      <trkpt lat="48.873477199999996" lon="2.337755">

      </trkpt>

      <trkpt lat="48.87345559999999" lon="2.3378811">

      </trkpt>

      <trkpt la


===============
df_heartrate
===============

index user minute speed heart_rate
0 0 0 0 3.7 130
1 1 0 1 3.6 133
2 2 0 2 3.5 134
3 3 0 3 3.9 130
4 4 0 4 3.6 130
5 5 0 5 3.5 133
6 6 0 6 3.9 133
7 7 0 7 3.8 132
8 8 0 8 3.8 130
9 9 0 9 3.9 132

===============
df_races
===============

index Race Runner Time
0 0 half marathon Bella 224.0
1 1 5k Ahmad 59.0
2 2 turkey trot Dylan 224.0
3 3 half marathon Sofia 185.0
4 4 marathon Emilia 270.0
5 5 turkey trot Chloe 54.0
6 6 5k Benjamin 135.0
7 7 half marathon Benjamin 128.0
8 8 400m Aubrey 29.7
9 9 marathon Al 339.0


The test cell below will always pass. Please submit to collect your free points for explore_data__FREE (exercise 0).

In [4]:
### Test Cell - Exercise 0  


print('Passed! Please submit.')
Passed! Please submit.

Exercise 1: (3 points)

parsegpx_string

Your task: define parsegpx_string as follows:

Implement a function, parsegpx_string(xmlstring), which parses a list of latitudes and longitudes. The input string is taken from a .gpx file which is extended XML.

Inputs:

  • xmlstring: A string extracted from a .gpx file

Return:

  • latitudes_longitudes: list of dictionaries containing lat and lon key:value pairs for each entry found within the xmlstring

Requirements/steps:

  • Within trkpt tag (which stands for track point and is found within a trk (track) and trkseg (track segment)), there will be a lat and lon field.
  • Extract all lat and lon and store in a list of dictionaries containing lat and lon key:value pairs
  • lat and lon strings must only include the characters 0-9, ., +, and -.
  • There are multiple ways to accomplish this. Regex may be helpful.
In [5]:
### Solution - Exercise 1  
def parsegpx_string(xmlstring:str)->list:
    ### BEGIN SOLUTION
    import re
    pattern = re.compile(r'''<trkpt lat="([\+\-0-9\.]+)" lon="([\+\-0-9\.]+)">''')
    return [{'lat':lat,'lon':lon} for lat, lon in pattern.findall(xmlstring)]
    ### END SOLUTION

### Demo function call
demo_ex0_xmlstring = utils.load_object_from_publicdata('demo_ex0_xmlstring.dill')
demo_ex0_output = parsegpx_string(demo_ex0_xmlstring)
pprint(demo_ex0_output)
[{'lat': '+48.8722677', 'lon': '-2.3372904999999995'},
 {'lat': '48.872473199999995', 'lon': '2.3373655'},
 {'lat': '48.8724855', 'lon': '2.3374103000000006'},
 {'lat': '48.874067499999995', 'lon': '2.3454081'},
 {'lat': '48.8740639', 'lon': '2.3464527'},
 {'lat': '48.874060699999994', 'lon': '2.347187'},
 {'lat': '48.872178299999995', 'lon': '2.3501147999999996'}]

The demo should display this printed output.

[{'lat': '+48.8722677', 'lon': '-2.3372904999999995'},
 {'lat': '48.872473199999995', 'lon': '2.3373655'},
 {'lat': '48.8724855', 'lon': '2.3374103000000006'},
 {'lat': '48.874067499999995', 'lon': '2.3454081'},
 {'lat': '48.8740639', 'lon': '2.3464527'},
 {'lat': '48.874060699999994', 'lon': '2.347187'},
 {'lat': '48.872178299999995', 'lon': '2.3501147999999996'}]


The cell below will test your solution for parsegpx_string (exercise 1). The testing variables will be available for debugging under the following names in a dictionary format.

  • input_vars - Input variables for your solution.
  • original_input_vars - Copy of input variables from prior to running your solution. Any key:value pair in original_input_vars should also exist in input_vars - otherwise the inputs were modified by your solution.
  • returned_output_vars - Outputs returned by your solution.
  • true_output_vars - The expected output. This should "match" returned_output_vars based on the question requirements - otherwise, your solution is not returning the correct output.
In [6]:
### Test Cell - Exercise 1  


from cse6040_devkit.tester_fw.testers import Tester
from yaml import safe_load
from time import time

tracemalloc.start()
mem_start, peak_start = tracemalloc.get_traced_memory()
print(f"initial memory usage: {mem_start/1024/1024:.2f} MB")

# Load testing utility
with open('resource/asnlib/publicdata/execute_tests', 'rb') as f:
    executor = dill.load(f)

@run_with_timeout(error_threshold=200.0, warning_threshold=100.0)
@suppress_stdout
def execute_tests(**kwargs):
    return executor(**kwargs)


# Execute test
start_time = time()
passed, test_case_vars, e = execute_tests(func=parsegpx_string,
              ex_name='parsegpx_string',
              key=b'R1r08DBgQQHILDOw___OgsG_1QX-_jJAGLag1EdnTPI=', 
              n_iter=21)
# Assign test case vars for debugging
input_vars, original_input_vars, returned_output_vars, true_output_vars = test_case_vars
duration = time() - start_time
print(f"Test duration: {duration:.2f} seconds")
current_memory, peak_memory = tracemalloc.get_traced_memory()
print(f"memory after test: {current_memory/1024/1024:.2f} MB")
print(f"memory peak during test: {peak_memory/1024/1024:.2f} MB")
tracemalloc.stop()
if e: raise e
assert passed, 'The solution to parsegpx_string did not pass the test.'

### BEGIN HIDDEN TESTS
start_time = time()
tracemalloc.start()
mem_start, peak_start = tracemalloc.get_traced_memory()
print(f"initial memory usage: {mem_start/1024/1024:.2f} MB")

passed, test_case_vars, e = execute_tests(func=parsegpx_string,
              ex_name='parsegpx_string',
              key=b'1kIeYcEN-UGgzaeKRsm5fCi9viAok6jEOXn6ctZCznw=', 
              n_iter=21,
              hidden=True)
input_vars, original_input_vars, returned_output_vars, true_output_vars = test_case_vars
duration = time() - start_time
print(f"Test duration: {duration:.2f} seconds")
current_memory, peak_memory = tracemalloc.get_traced_memory()
print(f"memory after test: {current_memory/1024/1024:.2f} MB")
print(f"memory peak during test: {peak_memory/1024/1024:.2f} MB")
tracemalloc.stop()
if e: raise e
assert passed, 'The solution to parsegpx_string did not pass the test.'
### END HIDDEN TESTS

print('Passed! Please submit.')
initial memory usage: 0.00 MB
Test duration: 0.14 seconds
memory after test: 3.07 MB
memory peak during test: 4.41 MB
initial memory usage: 0.00 MB
Test duration: 0.07 seconds
memory after test: 0.06 MB
memory peak during test: 1.43 MB
Passed! Please submit.

Exercise 2: (2 points)

next_destination_frequency

Your task: define next_destination_frequency as follows:

Implement a function which calculates the conditional probability of going from an origin point a to the next point b.

Inputs:

  • run_records: A dictionary with the key representing a run_id and the values are an ordered list of points that the runner ran to.

Return:

  • next_dest_dict: a nested dictionary where:
    • the first key is the origin point a
    • the second key is the next point b
    • the value is the conditional probability of running from a to b based on starting at a, rounded to 3 decimal places

Simple Example:

  • Suppose there are 3 runs with Point A as the origin point:

    {0: ['A', 'B', 'C'], 1: ['A','B'], 2: ['A','C']}

  • Point A to point B happens 2 out of 3 times. Therefore, its conditional probability would be 2/3 = .667
  • Point A to point C happens 1 out of 3 times. Therefore, its conditional probability would be 1/3 = .333
  • Point B to point C happens 1 out of 1 time. Therefore, its conditional probability would be 1/1 = 1.000
  • The resulting output would look like the following:

    {'A': {'B': 0.667, 'C': 0.333}, 'B': {'C': 1.000} }

Requirements/steps:

  • The module collections has Counter and defaultdict which may be helpful, but are not required.
  • Round the final value to 3 decimal points. Do not round each intermediate calculation to 3 decimal points.
In [7]:
### Solution - Exercise 2  
def next_destination_frequency(run_records: dict) -> dict:
    ### BEGIN SOLUTION
    from collections import defaultdict, Counter
    
    next_dest_dict = defaultdict(Counter)

    for run_id, dest_list in run_records.items():
        for i in range(0, len(dest_list)-1):
            origin = dest_list[i]
            dest = dest_list[i+1]
            next_dest_dict[origin][dest] += 1

    for key in next_dest_dict.keys():
        total = sum(next_dest_dict[key].values())
        for inner_key, value in next_dest_dict[key].items():
            next_dest_dict[key][inner_key] = round(value / total,3)
    next_dest_dict = dict(next_dest_dict)

    return {k:{kk:vv for kk,vv in v.items()} for k,v in next_dest_dict.items()} #remove Counter from dict outputs
    ### END SOLUTION

### Demo function call
demo_run_records = utils.load_object_from_publicdata('demo_run_records.dill')
demo_next_dest_dict = next_destination_frequency(demo_run_records)
pprint(demo_next_dest_dict)
{'Apollo Theater': {'Bronx Zoo': 0.333,
                    'Central Park': 0.333,
                    'Citi Field': 0.333},
 'Battery Park': {'Apollo Theater': 0.667, 'Brooklyn Public Library': 0.333},
 'Bronx Zoo': {'Battery Park': 0.5, 'Brooklyn Public Library': 0.5},
 'Brooklyn Bridge': {'Central Park': 0.5, 'The Sphere': 0.5},
 'Brooklyn Public Library': {'Battery Park': 0.333,
                             'Bronx Zoo': 0.167,
                             'Citi Field': 0.167,
                             'The Sphere': 0.333},
 'Central Park': {'Brooklyn Public Library': 0.667, 'Citi Field': 0.333},
 'Citi Field': {'Brooklyn Bridge': 0.333, 'Brooklyn Public Library': 0.667},
 'The Sphere': {'Apollo Theater': 0.333,
                'Brooklyn Bridge': 0.333,
                'Central Park': 0.333}}

The demo should display this printed output.

{'Apollo Theater': {'Bronx Zoo': 0.333,
                    'Central Park': 0.333,
                    'Citi Field': 0.333},
 'Battery Park': {'Apollo Theater': 0.667, 'Brooklyn Public Library': 0.333},
 'Bronx Zoo': {'Battery Park': 0.5, 'Brooklyn Public Library': 0.5},
 'Brooklyn Bridge': {'Central Park': 0.5, 'The Sphere': 0.5},
 'Brooklyn Public Library': {'Battery Park': 0.333,
                             'Bronx Zoo': 0.167,
                             'Citi Field': 0.167,
                             'The Sphere': 0.333},
 'Central Park': {'Brooklyn Public Library': 0.667, 'Citi Field': 0.333},
 'Citi Field': {'Brooklyn Bridge': 0.333, 'Brooklyn Public Library': 0.667},
 'The Sphere': {'Apollo Theater': 0.333,
                'Brooklyn Bridge': 0.333,
                'Central Park': 0.333}}


The cell below will test your solution for next_destination_frequency (exercise 2). The testing variables will be available for debugging under the following names in a dictionary format.

  • input_vars - Input variables for your solution.
  • original_input_vars - Copy of input variables from prior to running your solution. Any key:value pair in original_input_vars should also exist in input_vars - otherwise the inputs were modified by your solution.
  • returned_output_vars - Outputs returned by your solution.
  • true_output_vars - The expected output. This should "match" returned_output_vars based on the question requirements - otherwise, your solution is not returning the correct output.
In [8]:
### Test Cell - Exercise 2  


from cse6040_devkit.tester_fw.testers import Tester
from yaml import safe_load
from time import time

tracemalloc.start()
mem_start, peak_start = tracemalloc.get_traced_memory()
print(f"initial memory usage: {mem_start/1024/1024:.2f} MB")

# Load testing utility
with open('resource/asnlib/publicdata/execute_tests', 'rb') as f:
    executor = dill.load(f)

@run_with_timeout(error_threshold=200.0, warning_threshold=100.0)
@suppress_stdout
def execute_tests(**kwargs):
    return executor(**kwargs)


# Execute test
start_time = time()
passed, test_case_vars, e = execute_tests(func=next_destination_frequency,
              ex_name='next_destination_frequency',
              key=b'R1r08DBgQQHILDOw___OgsG_1QX-_jJAGLag1EdnTPI=', 
              n_iter=21)
# Assign test case vars for debugging
input_vars, original_input_vars, returned_output_vars, true_output_vars = test_case_vars
duration = time() - start_time
print(f"Test duration: {duration:.2f} seconds")
current_memory, peak_memory = tracemalloc.get_traced_memory()
print(f"memory after test: {current_memory/1024/1024:.2f} MB")
print(f"memory peak during test: {peak_memory/1024/1024:.2f} MB")
tracemalloc.stop()
if e: raise e
assert passed, 'The solution to next_destination_frequency did not pass the test.'

### BEGIN HIDDEN TESTS
start_time = time()
tracemalloc.start()
mem_start, peak_start = tracemalloc.get_traced_memory()
print(f"initial memory usage: {mem_start/1024/1024:.2f} MB")

passed, test_case_vars, e = execute_tests(func=next_destination_frequency,
              ex_name='next_destination_frequency',
              key=b'1kIeYcEN-UGgzaeKRsm5fCi9viAok6jEOXn6ctZCznw=', 
              n_iter=21,
              hidden=True)
input_vars, original_input_vars, returned_output_vars, true_output_vars = test_case_vars
duration = time() - start_time
print(f"Test duration: {duration:.2f} seconds")
current_memory, peak_memory = tracemalloc.get_traced_memory()
print(f"memory after test: {current_memory/1024/1024:.2f} MB")
print(f"memory peak during test: {peak_memory/1024/1024:.2f} MB")
tracemalloc.stop()
if e: raise e
assert passed, 'The solution to next_destination_frequency did not pass the test.'
### END HIDDEN TESTS

print('Passed! Please submit.')
initial memory usage: 0.00 MB
Test duration: 0.11 seconds
memory after test: 0.05 MB
memory peak during test: 1.38 MB
initial memory usage: 0.00 MB
Test duration: 0.11 seconds
memory after test: 0.04 MB
memory peak during test: 1.37 MB
Passed! Please submit.

Exercise 3: (2 points)

getdistances

Your task: define getdistances as follows:

Implement a function to calculate the distance from 2 points of latitudes/longitudes.

Inputs:

  • d: A Pandas DataFrame which contains lat and lon columns

Return:

  • distances: a list of distances, rounded to 5 decimal places, between subsequent points of latitudes/longitudes

Requirements/steps:

  • Remember, latitudes and longitudes are still strings from Ex1.
  • Implement the Great-circle distance $d$ as detailed below: $$ d=r\,\Delta \sigma $$
    • where:
      • $r$ is the Earth's radius (radius = 3,956 miles)
      • $\Delta \sigma$ is the central angle between coordinates and is represented below
        • $\phi_1$ and $\phi_2$ are the latitudes of two points 1 and 2 respectively in radians
        • $\lambda_1$ and $\lambda_2$ are the longitudes of two points 1 and 2 respectively in radians
        • $\Delta \phi = \phi_2 - \phi_1$ (difference in latitudes).
        • $\Delta \lambda = \lambda_2 - \lambda_1$ (difference in longitudes)
$$ \Delta \sigma =\arccos {\bigl (}\sin \phi _{1}\sin \phi _{2}+\cos \phi _{1}\cos \phi _{2}\cos \Delta \lambda {\bigr )} $$
In [9]:
### Solution - Exercise 3  
def getdistances(d: pd.DataFrame) -> list:

    from math import sin, cos, sqrt, acos, radians
    
    r=3956
    
    ### BEGIN SOLUTION
    lon=[float(i) for i in d['lon'].to_list()]
    lat=[float(i) for i in d['lat'].to_list()]
    distances=[]
    for i in range(len(lon)-1):
        
        lat1=radians(lat[i])
        lon1=radians(lon[i])
        lat2=radians(lat[i+1])
        lon2=radians(lon[i+1])

        dlon = lon2 - lon1

        a = sin(lat1) * sin(lat2) + cos(lat1) * cos(lat2) * cos(dlon)
        central_angle = acos(a)
        distances.append(round(r * central_angle,5))
    return distances
    ### END SOLUTION

### Demo function call
demo_getdistances_d = utils.load_object_from_publicdata('demo_getdistances_d.dill')
demo_getdistances_output = getdistances(demo_getdistances_d)
pprint(demo_getdistances_output)
[0.24721, 0.20106, 0.40918, 0.14515]

The demo should display this printed output.

[0.24721, 0.20106, 0.40918, 0.14515]


The cell below will test your solution for getdistances (exercise 3). The testing variables will be available for debugging under the following names in a dictionary format.

  • input_vars - Input variables for your solution.
  • original_input_vars - Copy of input variables from prior to running your solution. Any key:value pair in original_input_vars should also exist in input_vars - otherwise the inputs were modified by your solution.
  • returned_output_vars - Outputs returned by your solution.
  • true_output_vars - The expected output. This should "match" returned_output_vars based on the question requirements - otherwise, your solution is not returning the correct output.
In [10]:
### Test Cell - Exercise 3  


from cse6040_devkit.tester_fw.testers import Tester
from yaml import safe_load
from time import time

tracemalloc.start()
mem_start, peak_start = tracemalloc.get_traced_memory()
print(f"initial memory usage: {mem_start/1024/1024:.2f} MB")

# Load testing utility
with open('resource/asnlib/publicdata/execute_tests', 'rb') as f:
    executor = dill.load(f)

@run_with_timeout(error_threshold=200.0, warning_threshold=100.0)
@suppress_stdout
def execute_tests(**kwargs):
    return executor(**kwargs)


# Execute test
start_time = time()
passed, test_case_vars, e = execute_tests(func=getdistances,
              ex_name='getdistances',
              key=b'R1r08DBgQQHILDOw___OgsG_1QX-_jJAGLag1EdnTPI=', 
              n_iter=21)
# Assign test case vars for debugging
input_vars, original_input_vars, returned_output_vars, true_output_vars = test_case_vars
duration = time() - start_time
print(f"Test duration: {duration:.2f} seconds")
current_memory, peak_memory = tracemalloc.get_traced_memory()
print(f"memory after test: {current_memory/1024/1024:.2f} MB")
print(f"memory peak during test: {peak_memory/1024/1024:.2f} MB")
tracemalloc.stop()
if e: raise e
assert passed, 'The solution to getdistances did not pass the test.'

### BEGIN HIDDEN TESTS
start_time = time()
tracemalloc.start()
mem_start, peak_start = tracemalloc.get_traced_memory()
print(f"initial memory usage: {mem_start/1024/1024:.2f} MB")

passed, test_case_vars, e = execute_tests(func=getdistances,
              ex_name='getdistances',
              key=b'1kIeYcEN-UGgzaeKRsm5fCi9viAok6jEOXn6ctZCznw=', 
              n_iter=21,
              hidden=True)
input_vars, original_input_vars, returned_output_vars, true_output_vars = test_case_vars
duration = time() - start_time
print(f"Test duration: {duration:.2f} seconds")
current_memory, peak_memory = tracemalloc.get_traced_memory()
print(f"memory after test: {current_memory/1024/1024:.2f} MB")
print(f"memory peak during test: {peak_memory/1024/1024:.2f} MB")
tracemalloc.stop()
if e: raise e
assert passed, 'The solution to getdistances did not pass the test.'
### END HIDDEN TESTS

print('Passed! Please submit.')
initial memory usage: 0.00 MB
Test duration: 0.12 seconds
memory after test: 0.08 MB
memory peak during test: 1.37 MB
initial memory usage: 0.00 MB
Test duration: 0.12 seconds
memory after test: 0.05 MB
memory peak during test: 1.36 MB
Passed! Please submit.

Exercise 4: (2 points)

geopandasdata

Your task: define geopandasdata as follows:

Implement a function, geopandasdata(d), which produces a geoPandas GeoDataFrame object. Maintain the same ordering as the input DataFrame

Inputs:

  • d: A pandas DataFrame with lat and lon as strings
  • crs: a string representing the Coordinate Reference System (CRS) which tells Python how those coordinates relate to places on Earth

Return:

  • path_gdf: a geoPandasDataFrame to be used for mapping

Requirements/steps:

  • Create a GeoPandas GeoDataFrame
    • lat and lon columns from input d.
    • geometry column which represents a GeometryArray. Hint: you should use points_from_xy.
    • crs should be set to the crs input variable
    • Do not modify the input DataFrame d
In [11]:
### Solution - Exercise 4  
def geopandasdata(d: pd.DataFrame,crs: str='EPSG:4326') -> list: 
    import geopandas as gpd
    d2=d.copy(deep=True)
    ### BEGIN SOLUTION
    def getpathcoords(d: pd.DataFrame) -> list:
        def getlatlon(d):
            lon=[float(i) for i in d['lon'].to_list()]
            lat=[float(i) for i in d['lat'].to_list()]
            return lat,lon
        lat, lon=getlatlon(d)
        return [(lat[i],lon[i]) for i in range(len(lon))]
    path_coords=getpathcoords(d2)
    path_gdf=gpd.GeoDataFrame(
      d2,
      geometry=gpd.points_from_xy([c[1] for c in path_coords],[c[0] for c in path_coords]),
      crs=crs
    )
    return path_gdf
    ### END SOLUTION

### Demo function call
demo_getdistances_d = utils.load_object_from_publicdata('demo_getdistances_d.dill')
demo_getpathcoords_output = geopandasdata(demo_getdistances_d)
display(demo_getpathcoords_output)
lat lon geometry
0 48.8706362 2.3473592 POINT (2.34736 48.87064)
1 48.872920199999996 2.3431671000000005 POINT (2.34317 48.87292)
2 48.875697599999995 2.3418363000000006 POINT (2.34184 48.87570)
3 48.8738757 2.3504104000000003 POINT (2.35041 48.87388)
4 48.8740889 2.3472305999999996 POINT (2.34723 48.87409)

The demo should display this output.

lat lon geometry
0 48.8706362 2.3473592 POINT (2.34736 48.87064)
1 48.872920199999996 2.3431671000000005 POINT (2.34317 48.87292)
2 48.875697599999995 2.3418363000000006 POINT (2.34184 48.87570)
3 48.8738757 2.3504104000000003 POINT (2.35041 48.87388)
4 48.8740889 2.3472305999999996 POINT (2.34723 48.87409)


The cell below will test your solution for geopandasdata (exercise 4). The testing variables will be available for debugging under the following names in a dictionary format.

  • input_vars - Input variables for your solution.
  • original_input_vars - Copy of input variables from prior to running your solution. Any key:value pair in original_input_vars should also exist in input_vars - otherwise the inputs were modified by your solution.
  • returned_output_vars - Outputs returned by your solution.
  • true_output_vars - The expected output. This should "match" returned_output_vars based on the question requirements - otherwise, your solution is not returning the correct output.
In [12]:
### Test Cell - Exercise 4  


from cse6040_devkit.tester_fw.testers import Tester
from yaml import safe_load
from time import time

tracemalloc.start()
mem_start, peak_start = tracemalloc.get_traced_memory()
print(f"initial memory usage: {mem_start/1024/1024:.2f} MB")

# Load testing utility
with open('resource/asnlib/publicdata/execute_tests', 'rb') as f:
    executor = dill.load(f)

@run_with_timeout(error_threshold=200.0, warning_threshold=100.0)
@suppress_stdout
def execute_tests(**kwargs):
    return executor(**kwargs)


# Execute test
start_time = time()
passed, test_case_vars, e = execute_tests(func=plugins.handle_gdf(geopandasdata),
              ex_name='geopandasdata',
              key=b'R1r08DBgQQHILDOw___OgsG_1QX-_jJAGLag1EdnTPI=', 
              n_iter=20)
# Assign test case vars for debugging
input_vars, original_input_vars, returned_output_vars, true_output_vars = test_case_vars
duration = time() - start_time
print(f"Test duration: {duration:.2f} seconds")
current_memory, peak_memory = tracemalloc.get_traced_memory()
print(f"memory after test: {current_memory/1024/1024:.2f} MB")
print(f"memory peak during test: {peak_memory/1024/1024:.2f} MB")
tracemalloc.stop()
if e: raise e
assert passed, 'The solution to geopandasdata did not pass the test.'

### BEGIN HIDDEN TESTS
start_time = time()
tracemalloc.start()
mem_start, peak_start = tracemalloc.get_traced_memory()
print(f"initial memory usage: {mem_start/1024/1024:.2f} MB")

passed, test_case_vars, e = execute_tests(func=plugins.handle_gdf(geopandasdata),
              ex_name='geopandasdata',
              key=b'1kIeYcEN-UGgzaeKRsm5fCi9viAok6jEOXn6ctZCznw=', 
              n_iter=20,
              hidden=True)
input_vars, original_input_vars, returned_output_vars, true_output_vars = test_case_vars
duration = time() - start_time
print(f"Test duration: {duration:.2f} seconds")
current_memory, peak_memory = tracemalloc.get_traced_memory()
print(f"memory after test: {current_memory/1024/1024:.2f} MB")
print(f"memory peak during test: {peak_memory/1024/1024:.2f} MB")
tracemalloc.stop()
if e: raise e
assert passed, 'The solution to geopandasdata did not pass the test.'
### END HIDDEN TESTS

print('Passed! Please submit.')
initial memory usage: 0.00 MB
Test duration: 0.77 seconds
memory after test: 0.20 MB
memory peak during test: 1.39 MB
initial memory usage: 0.00 MB
Test duration: 0.75 seconds
memory after test: 0.10 MB
memory peak during test: 1.36 MB
Passed! Please submit.
In [13]:
### Run Me!!!
demo_result_geopandasdata_TRUE = utils.load_object_from_publicdata('demo_result_geopandasdata_TRUE')

Now that we can extract path coordinates as well as latitudes and longitudes, we can plot the routes on an image using matplotlib. Assume that data has all the latitudes and longitudes from the original GPX file. We'll use these generated png files later to build a machine learning model to classify the type of shape.

import matplotlib.pyplot as plt
ax=geopandasdata(data).plot()
# plt.savefig(f'resource/asnlib/publicdata/pi_shaperun_paris_2.png')
plt.show()

pyplot_png

Additionally, we can use an interactive map plotting library such as Folium to plot our gpx on a map.

In [14]:
# # https://stackoverflow.com/questions/60578408/is-it-possible-to-draw-paths-in-folium
# https://stackoverflow.com/questions/71831698/creating-a-folium-map-with-markers-with-different-colors
import folium
def plotfolium(path_coords,latlon):
  lat,lon=latlon
  from statistics import mean
  m=folium.Map(location=[mean(lat),mean(lon)],zoom_start=14,tiles="CartoDB positron")
  folium.PolyLine(path_coords,color="red",opacity=.9).add_to(m)
  return m

pathcoords=utils.load_object_from_publicdata('demo_pathcoords.dill')
latlon=utils.load_object_from_publicdata('demo_latlon.dill')
plotfolium(pathcoords,latlon)
Out[14]:

Exercise 5: (2 points)

convert_distances

Your task: define convert_distances as follows:

Implement a function which converts and standardizes data to either miles or kilometers.

Inputs:

  • df_runs: A Pandas DataFrame containing distance and unit columns
  • final_unit: a string, either miles or kilometers

Return:

  • df_runs_copy: a Pandas DataFrame containing distance and unit columns:
    • the distance column needs to be rounded to 3 decimals

Requirements/steps:

  • Using miles_to_km, convert and standardize the data in the df_runs DataFrame based on the final_unit
  • Remember, 1 mile is 1.609 kilometers. This may be helpful as you're looking at your function output.
In [15]:
### Solution - Exercise 5  
def convert_distances(df_runs: pd.DataFrame, final_unit: str) -> pd.DataFrame:
    miles_to_km = 1.609
    
    ### BEGIN SOLUTION
    df_runs_copy = df_runs.copy()
    
    if final_unit == 'miles':
        df_runs_copy.loc[df_runs_copy['unit'] != 'miles', 'distance'] = df_runs_copy.loc[df_runs_copy['unit'] != 'miles', 'distance'] / miles_to_km
    else:
        df_runs_copy.loc[df_runs_copy['unit'] != 'kilometers', 'distance'] = df_runs_copy.loc[df_runs_copy['unit'] != 'kilometers', 'distance'] * miles_to_km
    df_runs_copy['unit'] = final_unit
    df_runs_copy['distance'] = df_runs_copy['distance'].round(3)
    return df_runs_copy
    ### END SOLUTION

### Demo function call
demo_convert_distances_df_runs = utils.load_object_from_publicdata('demo_convert_distances_df_runs.dill')
demo_convert_distances_output = convert_distances(demo_convert_distances_df_runs, 'miles')
display(demo_convert_distances_output)
distance unit
0 1.865 miles
1 0.622 miles
2 9.000 miles
3 1.865 miles
4 9.000 miles

The demo should display this output.

distance unit
0 1.865 miles
1 0.622 miles
2 9.000 miles
3 1.865 miles
4 9.000 miles


The cell below will test your solution for convert_distances (exercise 5). The testing variables will be available for debugging under the following names in a dictionary format.

  • input_vars - Input variables for your solution.
  • original_input_vars - Copy of input variables from prior to running your solution. Any key:value pair in original_input_vars should also exist in input_vars - otherwise the inputs were modified by your solution.
  • returned_output_vars - Outputs returned by your solution.
  • true_output_vars - The expected output. This should "match" returned_output_vars based on the question requirements - otherwise, your solution is not returning the correct output.
In [16]:
### Test Cell - Exercise 5  


from cse6040_devkit.tester_fw.testers import Tester
from yaml import safe_load
from time import time

tracemalloc.start()
mem_start, peak_start = tracemalloc.get_traced_memory()
print(f"initial memory usage: {mem_start/1024/1024:.2f} MB")

# Load testing utility
with open('resource/asnlib/publicdata/execute_tests', 'rb') as f:
    executor = dill.load(f)

@run_with_timeout(error_threshold=200.0, warning_threshold=100.0)
@suppress_stdout
def execute_tests(**kwargs):
    return executor(**kwargs)


# Execute test
start_time = time()
passed, test_case_vars, e = execute_tests(func=convert_distances,
              ex_name='convert_distances',
              key=b'R1r08DBgQQHILDOw___OgsG_1QX-_jJAGLag1EdnTPI=', 
              n_iter=21)
# Assign test case vars for debugging
input_vars, original_input_vars, returned_output_vars, true_output_vars = test_case_vars
duration = time() - start_time
print(f"Test duration: {duration:.2f} seconds")
current_memory, peak_memory = tracemalloc.get_traced_memory()
print(f"memory after test: {current_memory/1024/1024:.2f} MB")
print(f"memory peak during test: {peak_memory/1024/1024:.2f} MB")
tracemalloc.stop()
if e: raise e
assert passed, 'The solution to convert_distances did not pass the test.'

### BEGIN HIDDEN TESTS
start_time = time()
tracemalloc.start()
mem_start, peak_start = tracemalloc.get_traced_memory()
print(f"initial memory usage: {mem_start/1024/1024:.2f} MB")

passed, test_case_vars, e = execute_tests(func=convert_distances,
              ex_name='convert_distances',
              key=b'1kIeYcEN-UGgzaeKRsm5fCi9viAok6jEOXn6ctZCznw=', 
              n_iter=21,
              hidden=True)
input_vars, original_input_vars, returned_output_vars, true_output_vars = test_case_vars
duration = time() - start_time
print(f"Test duration: {duration:.2f} seconds")
current_memory, peak_memory = tracemalloc.get_traced_memory()
print(f"memory after test: {current_memory/1024/1024:.2f} MB")
print(f"memory peak during test: {peak_memory/1024/1024:.2f} MB")
tracemalloc.stop()
if e: raise e
assert passed, 'The solution to convert_distances did not pass the test.'
### END HIDDEN TESTS

print('Passed! Please submit.')
initial memory usage: 0.00 MB
Test duration: 0.23 seconds
memory after test: 0.11 MB
memory peak during test: 1.38 MB
initial memory usage: 0.00 MB
Test duration: 0.23 seconds
memory after test: 0.08 MB
memory peak during test: 1.36 MB
Passed! Please submit.
In [17]:
### Run Me!!!
demo_result_convert_distances_TRUE = utils.load_object_from_publicdata('demo_result_convert_distances_TRUE')

Exercise 6: (2 points)

heartratedrift

Your task: define heartratedrift_query as follows:

Write a SQL query to calculate heart rate drift

Requirements/steps:

Data is from one hour runs. For every user, their speed (mph) and heart rate (bpm) is tabulated for each minute of their one hour run from minute 0 to minute 59.

user minute speed heart_rate
001 0 4.1 136
001 1 4.5 135
001 2 3.9 135
001 3 3.8 136

We want to calculate the heart rate drift % for each runner over the hour period. This is calculated by:

$$ \frac {\frac{\bar{s_1}} {\bar{h_1}} - \frac{\bar{s_2}} {\bar{h_2}}} {\frac{\bar{s_1}} {\bar{h_1}}} $$
  • $\bar{s_1}$ is the average speed for the first half of the run (minute 0 to minute 29)
  • $\bar{s_2}$ is the average speed for the second half of the run (minute 30 to minute 59)
  • $\bar{h_1}$ is the average heart rate for the first half of the run (minute 0 to minute 29)
  • $\bar{h_2}$ is the average heart rate for the second half of the run (minute 30 to minute 59)

Your query should return one row per runner with their heart rate drift %

user heart_rate_drift
001 0.038
002 0.012
In [18]:
### Solution - Exercise 6  
heartratedrift_query = '''YOUR QUERY HERE'''
### BEGIN SOLUTION
heartratedrift_query = '''
    WITH user_stats AS (
      SELECT
          user
        , AVG(CASE WHEN minute <= 29 THEN speed ELSE NULL END)      AS first_half_speed
        , AVG(CASE WHEN minute <= 29 THEN heart_rate ELSE NULL END) AS first_half_heart_rate
        , AVG(CASE WHEN minute > 29 THEN speed ELSE NULL END)       AS second_half_speed
        , AVG(CASE WHEN minute > 29 THEN heart_rate ELSE NULL END)  AS second_half_heart_rate
      FROM df_heartrate
      GROUP BY 1
      )
      SELECT
          user
        , ((first_half_speed / first_half_heart_rate) - (second_half_speed / second_half_heart_rate)) / (first_half_speed / first_half_heart_rate) AS heart_rate_drift
      FROM user_stats
'''
### END SOLUTION


### Demo function call
demo_result_heartratedrift = pd.read_sql(heartratedrift_query, conn)
demo_result_heartratedrift_output=demo_result_heartratedrift[demo_result_heartratedrift['user']=='1']
display(demo_result_heartratedrift_output)
user heart_rate_drift
1 1 -0.007659

The demo should display this output.

user heart_rate_drift
1 1 -0.007659


The cell below will test your solution for heartratedrift (exercise 6). The testing variables will be available for debugging under the following names in a dictionary format.

  • input_vars - Input variables for your solution.
  • original_input_vars - Copy of input variables from prior to running your solution. Any key:value pair in original_input_vars should also exist in input_vars - otherwise the inputs were modified by your solution.
  • returned_output_vars - Outputs returned by your solution.
  • true_output_vars - The expected output. This should "match" returned_output_vars based on the question requirements - otherwise, your solution is not returning the correct output.
In [19]:
### Test Cell - Exercise 6  


from cse6040_devkit.tester_fw.testers import Tester
from yaml import safe_load
from time import time

tracemalloc.start()
mem_start, peak_start = tracemalloc.get_traced_memory()
print(f"initial memory usage: {mem_start/1024/1024:.2f} MB")

# Load testing utility
with open('resource/asnlib/publicdata/execute_tests', 'rb') as f:
    executor = dill.load(f)

@run_with_timeout(error_threshold=200.0, warning_threshold=100.0)
@suppress_stdout
def execute_tests(**kwargs):
    return executor(**kwargs)


# Execute test
start_time = time()
passed, test_case_vars, e = execute_tests(func=plugins.sql_executor(heartratedrift_query),
              ex_name='heartratedrift',
              key=b'R1r08DBgQQHILDOw___OgsG_1QX-_jJAGLag1EdnTPI=', 
              n_iter=10)
# Assign test case vars for debugging
input_vars, original_input_vars, returned_output_vars, true_output_vars = test_case_vars
duration = time() - start_time
print(f"Test duration: {duration:.2f} seconds")
current_memory, peak_memory = tracemalloc.get_traced_memory()
print(f"memory after test: {current_memory/1024/1024:.2f} MB")
print(f"memory peak during test: {peak_memory/1024/1024:.2f} MB")
tracemalloc.stop()
if e: raise e
assert passed, 'The solution to heartratedrift did not pass the test.'

### BEGIN HIDDEN TESTS
start_time = time()
tracemalloc.start()
mem_start, peak_start = tracemalloc.get_traced_memory()
print(f"initial memory usage: {mem_start/1024/1024:.2f} MB")

passed, test_case_vars, e = execute_tests(func=plugins.sql_executor(heartratedrift_query),
              ex_name='heartratedrift',
              key=b'1kIeYcEN-UGgzaeKRsm5fCi9viAok6jEOXn6ctZCznw=', 
              n_iter=10,
              hidden=True)
input_vars, original_input_vars, returned_output_vars, true_output_vars = test_case_vars
duration = time() - start_time
print(f"Test duration: {duration:.2f} seconds")
current_memory, peak_memory = tracemalloc.get_traced_memory()
print(f"memory after test: {current_memory/1024/1024:.2f} MB")
print(f"memory peak during test: {peak_memory/1024/1024:.2f} MB")
tracemalloc.stop()
if e: raise e
assert passed, 'The solution to heartratedrift did not pass the test.'
### END HIDDEN TESTS

print('Passed! Please submit.')
initial memory usage: 0.00 MB
Test duration: 0.16 seconds
memory after test: 0.12 MB
memory peak during test: 2.41 MB
initial memory usage: 0.00 MB
Test duration: 0.16 seconds
memory after test: 0.07 MB
memory peak during test: 2.42 MB
Passed! Please submit.
In [20]:
### Run Me!!!
demo_result_heartratedrift_TRUE = utils.load_object_from_publicdata('demo_result_heartratedrift_TRUE')

Exercise 7: (3 points)

race_winners

Your task: define race_winners_query as follows:

Write a SQL query to calculate the top 3 finishers and their time delta with the runner in front

Requirements/steps:

  • Use df_races table
  • You may need a CTE or subquery
  • Window Function and LAG may be helpful.
  • For every Race, determine the top 3 Runner finishers and their time Delta from the finisher in front of them
    • Time is in minutes
    • You can assume names are unique
    • There are no ties
    • The first place finisher will have a Delta of NaN because no one finished ahead of them.
  • Input:
Race Runner Time
marathon Zac 248.7
marathon Hannah 241.5
marathon Bella 242.9
marathon Liam 236.7
marathon Riley 224.6
  • Output:
Race Runner Delta
marathon Riley NaN
marathon Liam 12.1
marathon Hannah 4.8
In [21]:
### Solution - Exercise 7  
race_winners_query = '''YOUR QUERY HERE'''
### BEGIN SOLUTION
race_winners_query = '''
    WITH ranked_by_race AS (
        SELECT 
            Race
          , Runner
          , Time
          , ROW_NUMBER() OVER(PARTITION BY Race ORDER BY Time) AS rownum
          , LAG(Time) OVER(PARTITION BY Race ORDER BY Time)    AS prev_runner_time
        FROM df_races
    )

    SELECT 
        Race
      , Runner
      , Time - prev_runner_time AS Delta
    FROM ranked_by_race
    WHERE rownum < 4
'''
### END SOLUTION


### Demo function call
demo_result_race_winners = pd.read_sql(race_winners_query, conn)
display(demo_result_race_winners)
Race Runner Delta
0 400m Mia NaN
1 400m Matthew 0.2
2 400m Andrew 0.1
3 5k Alexander NaN
4 5k Chloe 0.5
5 5k Harper 5.3
6 half marathon Carter NaN
7 half marathon Zoe 1.0
8 half marathon Andrew 1.0
9 marathon Riley NaN
10 marathon Liam 12.1
11 marathon Hannah 4.8
12 turkey trot Carter NaN
13 turkey trot Zoe 2.0
14 turkey trot Andrew 2.0

The demo should display this output.

Race Runner Delta
0 400m Mia NaN
1 400m Matthew 0.2
2 400m Andrew 0.1
3 5k Alexander NaN
4 5k Chloe 0.5
5 5k Harper 5.3
6 half marathon Carter NaN
7 half marathon Zoe 1.0
8 half marathon Andrew 1.0
9 marathon Riley NaN
10 marathon Liam 12.1
11 marathon Hannah 4.8
12 turkey trot Carter NaN
13 turkey trot Zoe 2.0
14 turkey trot Andrew 2.0


The cell below will test your solution for race_winners (exercise 7). The testing variables will be available for debugging under the following names in a dictionary format.

  • input_vars - Input variables for your solution.
  • original_input_vars - Copy of input variables from prior to running your solution. Any key:value pair in original_input_vars should also exist in input_vars - otherwise the inputs were modified by your solution.
  • returned_output_vars - Outputs returned by your solution.
  • true_output_vars - The expected output. This should "match" returned_output_vars based on the question requirements - otherwise, your solution is not returning the correct output.
In [22]:
### Test Cell - Exercise 7  


from cse6040_devkit.tester_fw.testers import Tester
from yaml import safe_load
from time import time

tracemalloc.start()
mem_start, peak_start = tracemalloc.get_traced_memory()
print(f"initial memory usage: {mem_start/1024/1024:.2f} MB")

# Load testing utility
with open('resource/asnlib/publicdata/execute_tests', 'rb') as f:
    executor = dill.load(f)

@run_with_timeout(error_threshold=200.0, warning_threshold=100.0)
@suppress_stdout
def execute_tests(**kwargs):
    return executor(**kwargs)


# Execute test
start_time = time()
passed, test_case_vars, e = execute_tests(func=plugins.sql_executor(race_winners_query),
              ex_name='race_winners',
              key=b'R1r08DBgQQHILDOw___OgsG_1QX-_jJAGLag1EdnTPI=', 
              n_iter=10)
# Assign test case vars for debugging
input_vars, original_input_vars, returned_output_vars, true_output_vars = test_case_vars
duration = time() - start_time
print(f"Test duration: {duration:.2f} seconds")
current_memory, peak_memory = tracemalloc.get_traced_memory()
print(f"memory after test: {current_memory/1024/1024:.2f} MB")
print(f"memory peak during test: {peak_memory/1024/1024:.2f} MB")
tracemalloc.stop()
if e: raise e
assert passed, 'The solution to race_winners did not pass the test.'

### BEGIN HIDDEN TESTS
start_time = time()
tracemalloc.start()
mem_start, peak_start = tracemalloc.get_traced_memory()
print(f"initial memory usage: {mem_start/1024/1024:.2f} MB")

passed, test_case_vars, e = execute_tests(func=plugins.sql_executor(race_winners_query),
              ex_name='race_winners',
              key=b'1kIeYcEN-UGgzaeKRsm5fCi9viAok6jEOXn6ctZCznw=', 
              n_iter=10,
              hidden=True)
input_vars, original_input_vars, returned_output_vars, true_output_vars = test_case_vars
duration = time() - start_time
print(f"Test duration: {duration:.2f} seconds")
current_memory, peak_memory = tracemalloc.get_traced_memory()
print(f"memory after test: {current_memory/1024/1024:.2f} MB")
print(f"memory peak during test: {peak_memory/1024/1024:.2f} MB")
tracemalloc.stop()
if e: raise e
assert passed, 'The solution to race_winners did not pass the test.'
### END HIDDEN TESTS

print('Passed! Please submit.')
initial memory usage: 0.00 MB
Test duration: 0.16 seconds
memory after test: 0.07 MB
memory peak during test: 1.37 MB
initial memory usage: 0.00 MB
Test duration: 0.16 seconds
memory after test: 0.05 MB
memory peak during test: 1.36 MB
Passed! Please submit.
In [23]:
### Run Me!!!
demo_result_race_winners_TRUE = utils.load_object_from_publicdata('demo_result_race_winners_TRUE')

Build a Model

Exercise 8: (1 points)

label_filenames

Your task: define label_filenames as follows:

Implement a function, label_filenames, which produces a list of tuples.

Inputs:

  • filenames: A list of filenames
  • firstclass: a string

Return:

  • labeled_filenames: a list of tuples where the tuple is (label,filename)

Requirements/steps:

  • If firstclass string is in the filename, label=0.
  • Else (where firstclass string is NOT in the filename), label=1.
  • Tuples need to be returned in order that they are encountered in filenames.
In [24]:
### Solution - Exercise 8  
def label_filenames(filenames:list,firstclass:str='pi') -> list: 
    ### BEGIN SOLUTION
    return [(0 if firstclass in filename else 1,filename) for filename in filenames]
    ### END SOLUTION

### Demo function call
filenames=['resource/asnlib/publicdata/pi.png','resource/asnlib/publicdata/rose.png','resource/asnlib/publicdata/pinyc.png']
demo_label_filenames_output = label_filenames(filenames,'pi')
pprint(demo_label_filenames_output)
[(0, 'resource/asnlib/publicdata/pi.png'),
 (1, 'resource/asnlib/publicdata/rose.png'),
 (0, 'resource/asnlib/publicdata/pinyc.png')]

The demo should display this printed output.

[(0, 'resource/asnlib/publicdata/pi.png'),
 (1, 'resource/asnlib/publicdata/rose.png'),
 (0, 'resource/asnlib/publicdata/pinyc.png')]


The cell below will test your solution for label_filenames (exercise 8). The testing variables will be available for debugging under the following names in a dictionary format.

  • input_vars - Input variables for your solution.
  • original_input_vars - Copy of input variables from prior to running your solution. Any key:value pair in original_input_vars should also exist in input_vars - otherwise the inputs were modified by your solution.
  • returned_output_vars - Outputs returned by your solution.
  • true_output_vars - The expected output. This should "match" returned_output_vars based on the question requirements - otherwise, your solution is not returning the correct output.
In [25]:
### Test Cell - Exercise 8  


from cse6040_devkit.tester_fw.testers import Tester
from yaml import safe_load
from time import time

tracemalloc.start()
mem_start, peak_start = tracemalloc.get_traced_memory()
print(f"initial memory usage: {mem_start/1024/1024:.2f} MB")

# Load testing utility
with open('resource/asnlib/publicdata/execute_tests', 'rb') as f:
    executor = dill.load(f)

@run_with_timeout(error_threshold=200.0, warning_threshold=100.0)
@suppress_stdout
def execute_tests(**kwargs):
    return executor(**kwargs)


# Execute test
start_time = time()
passed, test_case_vars, e = execute_tests(func=label_filenames,
              ex_name='label_filenames',
              key=b'R1r08DBgQQHILDOw___OgsG_1QX-_jJAGLag1EdnTPI=', 
              n_iter=21)
# Assign test case vars for debugging
input_vars, original_input_vars, returned_output_vars, true_output_vars = test_case_vars
duration = time() - start_time
print(f"Test duration: {duration:.2f} seconds")
current_memory, peak_memory = tracemalloc.get_traced_memory()
print(f"memory after test: {current_memory/1024/1024:.2f} MB")
print(f"memory peak during test: {peak_memory/1024/1024:.2f} MB")
tracemalloc.stop()
if e: raise e
assert passed, 'The solution to label_filenames did not pass the test.'

### BEGIN HIDDEN TESTS
start_time = time()
tracemalloc.start()
mem_start, peak_start = tracemalloc.get_traced_memory()
print(f"initial memory usage: {mem_start/1024/1024:.2f} MB")

passed, test_case_vars, e = execute_tests(func=label_filenames,
              ex_name='label_filenames',
              key=b'1kIeYcEN-UGgzaeKRsm5fCi9viAok6jEOXn6ctZCznw=', 
              n_iter=21,
              hidden=True)
input_vars, original_input_vars, returned_output_vars, true_output_vars = test_case_vars
duration = time() - start_time
print(f"Test duration: {duration:.2f} seconds")
current_memory, peak_memory = tracemalloc.get_traced_memory()
print(f"memory after test: {current_memory/1024/1024:.2f} MB")
print(f"memory peak during test: {peak_memory/1024/1024:.2f} MB")
tracemalloc.stop()
if e: raise e
assert passed, 'The solution to label_filenames did not pass the test.'
### END HIDDEN TESTS

print('Passed! Please submit.')
initial memory usage: 0.00 MB
Test duration: 0.07 seconds
memory after test: 0.02 MB
memory peak during test: 1.37 MB
initial memory usage: 0.00 MB
Test duration: 0.07 seconds
memory after test: 0.02 MB
memory peak during test: 1.36 MB
Passed! Please submit.

Exercise 9: (2 points)

create_model_data

Your task: define create_model_data as follows:

Implement a function, create_model_data, which produces a tuple.

Inputs:

  • label_filenames: A list of tuples (label,filename)
  • array_size: a tuple representing the preferred image size. Defaults to (4,4).

Return:

  • model_data: a tuple representing:
    • a list of numpy.ndarrays of independent variables (X) as the first element
    • a list of int of dependent variables (Y) as the second element

Requirements/steps:

  • For each tuple in the label_filenames list:
    • Using the cv2 package, read the image into an numpy.ndarray using img_arr=cv2.imread(filename)[...,::-1] method
    • Resize that image array to array_size image size, which defaults to (4,4), using cv2.resize
  • Return your tuple of:
    • a list of resized independent variables
    • a list of labeled dependent variables
In [26]:
### Solution - Exercise 9  
def create_model_data(label_filenames:list,array_size:tuple=(4,4)) -> tuple: 
    import cv2
    # img_arr=cv2.imread(filename)[...,::-1] ### You will use this!
    ### BEGIN SOLUTION
    data=[]
    ydata=[]
    for l in label_filenames:
        label,filename=l
        img_arr=cv2.imread(filename)[...,::-1]
        resized_arr=cv2.resize(img_arr,array_size)
        data.append(resized_arr)
        ydata.append(label)
    return data,ydata
    ### END SOLUTION

### Demo function call
label_filenames=[(0,'resource/asnlib/publicdata/pi.png'),(1,'resource/asnlib/publicdata/rose.png')]
demo_create_model_data_output = create_model_data(label_filenames)
pprint(demo_create_model_data_output)
([array([[[255, 255, 255],
        [140, 198, 197],
        [140, 198, 197],
        [255, 255, 255]],

       [[255, 255, 255],
        [144, 204, 203],
        [144, 204, 203],
        [255, 255, 255]],

       [[255, 255, 255],
        [144, 204, 203],
        [144, 204, 203],
        [255, 255, 255]],

       [[255, 255, 255],
        [136, 136, 136],
        [144, 204, 203],
        [255, 255, 255]]], dtype=uint8),
  array([[[255, 255, 255],
        [140, 198, 197],
        [140, 198, 197],
        [255, 255, 255]],

       [[157, 157, 157],
        [144, 204, 203],
        [144, 204, 203],
        [255, 255, 255]],

       [[255, 255, 255],
        [144, 204, 203],
        [144, 204, 203],
        [255, 255, 255]],

       [[255, 255, 255],
        [235, 235, 235],
        [238, 238, 238],
        [255, 255, 255]]], dtype=uint8)],
 [0, 1])

The demo should display this printed output.

([array([[[255, 255, 255],
        [140, 198, 197],
        [140, 198, 197],
        [255, 255, 255]],

       [[255, 255, 255],
        [144, 204, 203],
        [144, 204, 203],
        [255, 255, 255]],

       [[255, 255, 255],
        [144, 204, 203],
        [144, 204, 203],
        [255, 255, 255]],

       [[255, 255, 255],
        [136, 136, 136],
        [144, 204, 203],
        [255, 255, 255]]], dtype=uint8),
  array([[[255, 255, 255],
        [140, 198, 197],
        [140, 198, 197],
        [255, 255, 255]],

       [[157, 157, 157],
        [144, 204, 203],
        [144, 204, 203],
        [255, 255, 255]],

       [[255, 255, 255],
        [144, 204, 203],
        [144, 204, 203],
        [255, 255, 255]],

       [[255, 255, 255],
        [235, 235, 235],
        [238, 238, 238],
        [255, 255, 255]]], dtype=uint8)],
 [0, 1])


The cell below will test your solution for create_model_data (exercise 9). The testing variables will be available for debugging under the following names in a dictionary format.

  • input_vars - Input variables for your solution.
  • original_input_vars - Copy of input variables from prior to running your solution. Any key:value pair in original_input_vars should also exist in input_vars - otherwise the inputs were modified by your solution.
  • returned_output_vars - Outputs returned by your solution.
  • true_output_vars - The expected output. This should "match" returned_output_vars based on the question requirements - otherwise, your solution is not returning the correct output.
In [27]:
### Test Cell - Exercise 9  


from cse6040_devkit.tester_fw.testers import Tester
from yaml import safe_load
from time import time

tracemalloc.start()
mem_start, peak_start = tracemalloc.get_traced_memory()
print(f"initial memory usage: {mem_start/1024/1024:.2f} MB")

# Load testing utility
with open('resource/asnlib/publicdata/execute_tests', 'rb') as f:
    executor = dill.load(f)

@run_with_timeout(error_threshold=200.0, warning_threshold=100.0)
@suppress_stdout
def execute_tests(**kwargs):
    return executor(**kwargs)


# Execute test
start_time = time()
passed, test_case_vars, e = execute_tests(func=create_model_data,
              ex_name='create_model_data',
              key=b'R1r08DBgQQHILDOw___OgsG_1QX-_jJAGLag1EdnTPI=', 
              n_iter=20)
# Assign test case vars for debugging
input_vars, original_input_vars, returned_output_vars, true_output_vars = test_case_vars
duration = time() - start_time
print(f"Test duration: {duration:.2f} seconds")
current_memory, peak_memory = tracemalloc.get_traced_memory()
print(f"memory after test: {current_memory/1024/1024:.2f} MB")
print(f"memory peak during test: {peak_memory/1024/1024:.2f} MB")
tracemalloc.stop()
if e: raise e
assert passed, 'The solution to create_model_data did not pass the test.'

### BEGIN HIDDEN TESTS
start_time = time()
tracemalloc.start()
mem_start, peak_start = tracemalloc.get_traced_memory()
print(f"initial memory usage: {mem_start/1024/1024:.2f} MB")

passed, test_case_vars, e = execute_tests(func=create_model_data,
              ex_name='create_model_data',
              key=b'1kIeYcEN-UGgzaeKRsm5fCi9viAok6jEOXn6ctZCznw=', 
              n_iter=20,
              hidden=True)
input_vars, original_input_vars, returned_output_vars, true_output_vars = test_case_vars
duration = time() - start_time
print(f"Test duration: {duration:.2f} seconds")
current_memory, peak_memory = tracemalloc.get_traced_memory()
print(f"memory after test: {current_memory/1024/1024:.2f} MB")
print(f"memory peak during test: {peak_memory/1024/1024:.2f} MB")
tracemalloc.stop()
if e: raise e
assert passed, 'The solution to create_model_data did not pass the test.'
### END HIDDEN TESTS

print('Passed! Please submit.')
initial memory usage: 0.00 MB
Test duration: 1.05 seconds
memory after test: 0.09 MB
memory peak during test: 1.98 MB
initial memory usage: 0.00 MB
Test duration: 1.22 seconds
memory after test: 0.07 MB
memory peak during test: 1.94 MB
Passed! Please submit.

This isn't going to be the best classifying model due to the limited images we gave it. Nonetheless, let's train on 7 images and test on 3 images to determine how effective our model is. We should likely increase the dimensions and layers of the Convolutional Neural Network based on the resulting predictions. Below you'll find the code.

import tensorflow as tf
from tensorflow.keras import layers, models
import matplotlib.pyplot as plt
#training
label_filenames=[(0,'gpsartproto/pi.png'),(1,'gpsartproto/rose.png'),(0,'gpsartproto/pinyc.png'),(1,'gpsartproto/rosenyc.png'),(0,'gpsartproto/pisf.png'),(1,'gpsartproto/roseboston.png'),(0,'gpsartproto/piboston.png')]
data,ydata=create_model_data(label_filenames)
dix_train=np.array(data)
diy_train=np.array(ydata).reshape(-1,1)

#testing
label_filenames=[(1,'gpsartproto/rosearlington.png'),(1,'gpsartproto/rosesf.png'),(0,'gpsartproto/pi_shaperun_paris.png')]
data,ydata=create_model_data(label_filenames)
dix_test=np.array(data)
diy_test=np.array(ydata).reshape(-1,1)

di_train, di_test = dix_train / 255.0, dix_test / 255.0

##train model
#define CNN for gpsartify
model = models.Sequential([
    layers.Conv2D(32, (3, 3), activation='relu', input_shape=(32, 32, 3)),
    layers.MaxPooling2D((2, 2)),

    layers.Conv2D(64, (3, 3), activation='relu'),
    layers.MaxPooling2D((2, 2)),

    layers.Conv2D(64, (3, 3), activation='relu'),
    layers.Flatten(),
    layers.Dense(64, activation='relu'),
    layers.Dense(2, activation='softmax')  # 2 classes in gpsartify
])

model.compile(optimizer='adam',
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])

## fit the model
history = model.fit(di_train, diy_train, epochs=10, validation_split=0.2)

#make predictions from model
predictions = model.predict(di_test)
class_names = ['pi','rose']

def show_prediction(index):
    plt.figure(figsize=(5,5))
    plt.imshow(di_test[index])
    pred_label = class_names[np.argmax(predictions[index])]
    print(diy_test[index])
    true_label = class_names[diy_test[index][0]]
    plt.title(f"Predicted: {pred_label}\nActual: {true_label}")
    plt.axis('off')
    plt.savefig(f'model_prediction_{index}.png')
    plt.show()

for i in range(3):
  show_prediction(i)

model_prediction_0 model_prediction_1 model_prediction_2

Create a running route

Suppose we had an image and we wanted to create a running route by overlaying that image onto a map.

Exercise 10: (1 points)

shift_svg

Your task: define shift_svg as follows:

Implement a function, shift_svg, which produces a numpy.ndarray.

Inputs:

  • sampled_points: A list of tuples representing an x,y coordinate associated with the svg images

Return:

  • shifted_points: A 2-D Numpy array of shifted coordinates (x', y'), where "shifting" is defined below. If there are m input points, then shifted_points is an m x 2 Numpy array.

Requirements/steps:

  • Let x_min be the smallest of the x input coordinates, and let y_min the smallest of the y input coordinates.
  • For each input point sampled_points[i] == (x, y), its shifted version is the point (x - x_min, y - y_min), which should be stored in row shifted_points[i, :] of the final output.
In [28]:
### Solution - Exercise 10  
def shift_svg(sampled_points:list) -> np.ndarray: 
    ### BEGIN SOLUTION
    # Shift to (0, 0)
    sampled_points = np.array(sampled_points)
    sampled_points -= sampled_points.min(axis=0)
    return sampled_points
    ### END SOLUTION

### Demo function call
demo_sampled_points = utils.load_object_from_publicdata('demo_shift_svg_paths.dill')
demo_sampled_points = demo_sampled_points[:10]
demo_shift_svg_output = shift_svg(demo_sampled_points)
pprint(demo_shift_svg_output)
array([[  0.        , 104.89984487],
       [ 25.54484314, 109.88842341],
       [ 37.40263979, 130.14757786],
       [ 53.77403485, 152.94725888],
       [ 79.74444851, 173.11282804],
       [120.39930102, 185.4696469 ],
       [176.81714907, 181.11087016],
       [172.8895754 , 110.88946172],
       [164.59477901,  50.86536302],
       [152.86201033,   0.        ]])

The demo should display this printed output.

array([[  0.        , 104.89984487],
       [ 25.54484314, 109.88842341],
       [ 37.40263979, 130.14757786],
       [ 53.77403485, 152.94725888],
       [ 79.74444851, 173.11282804],
       [120.39930102, 185.4696469 ],
       [176.81714907, 181.11087016],
       [172.8895754 , 110.88946172],
       [164.59477901,  50.86536302],
       [152.86201033,   0.        ]])


The cell below will test your solution for shift_svg (exercise 10). The testing variables will be available for debugging under the following names in a dictionary format.

  • input_vars - Input variables for your solution.
  • original_input_vars - Copy of input variables from prior to running your solution. Any key:value pair in original_input_vars should also exist in input_vars - otherwise the inputs were modified by your solution.
  • returned_output_vars - Outputs returned by your solution.
  • true_output_vars - The expected output. This should "match" returned_output_vars based on the question requirements - otherwise, your solution is not returning the correct output.
In [29]:
### Test Cell - Exercise 10  


from cse6040_devkit.tester_fw.testers import Tester
from yaml import safe_load
from time import time

tracemalloc.start()
mem_start, peak_start = tracemalloc.get_traced_memory()
print(f"initial memory usage: {mem_start/1024/1024:.2f} MB")

# Load testing utility
with open('resource/asnlib/publicdata/execute_tests', 'rb') as f:
    executor = dill.load(f)

@run_with_timeout(error_threshold=200.0, warning_threshold=100.0)
@suppress_stdout
def execute_tests(**kwargs):
    return executor(**kwargs)


# Execute test
start_time = time()
passed, test_case_vars, e = execute_tests(func=shift_svg,
              ex_name='shift_svg',
              key=b'R1r08DBgQQHILDOw___OgsG_1QX-_jJAGLag1EdnTPI=', 
              n_iter=21)
# Assign test case vars for debugging
input_vars, original_input_vars, returned_output_vars, true_output_vars = test_case_vars
duration = time() - start_time
print(f"Test duration: {duration:.2f} seconds")
current_memory, peak_memory = tracemalloc.get_traced_memory()
print(f"memory after test: {current_memory/1024/1024:.2f} MB")
print(f"memory peak during test: {peak_memory/1024/1024:.2f} MB")
tracemalloc.stop()
if e: raise e
assert passed, 'The solution to shift_svg did not pass the test.'

### BEGIN HIDDEN TESTS
start_time = time()
tracemalloc.start()
mem_start, peak_start = tracemalloc.get_traced_memory()
print(f"initial memory usage: {mem_start/1024/1024:.2f} MB")

passed, test_case_vars, e = execute_tests(func=shift_svg,
              ex_name='shift_svg',
              key=b'1kIeYcEN-UGgzaeKRsm5fCi9viAok6jEOXn6ctZCznw=', 
              n_iter=21,
              hidden=True)
input_vars, original_input_vars, returned_output_vars, true_output_vars = test_case_vars
duration = time() - start_time
print(f"Test duration: {duration:.2f} seconds")
current_memory, peak_memory = tracemalloc.get_traced_memory()
print(f"memory after test: {current_memory/1024/1024:.2f} MB")
print(f"memory peak during test: {peak_memory/1024/1024:.2f} MB")
tracemalloc.stop()
if e: raise e
assert passed, 'The solution to shift_svg did not pass the test.'
### END HIDDEN TESTS

print('Passed! Please submit.')
initial memory usage: 0.00 MB
Test duration: 0.10 seconds
memory after test: 0.11 MB
memory peak during test: 1.63 MB
initial memory usage: 0.00 MB
Test duration: 0.10 seconds
memory after test: 0.10 MB
memory peak during test: 1.66 MB
Passed! Please submit.

As aforementioned, if we wanted to create our own running routes by overlaying an image onto a map in our city. Below is some code that will allow us to accomplish that. The methodology is as follows:

  • Using a SVG image, generate a sequence of points that describe the main shape
  • Overlay the shape onto a map. For this, we'll need to scale/translate those SVG points so they overlay onto a box given a city latitude/longitude. We'll use Paris, France as our anchor city.
  • Use a undirected graph to snap to the streets and trails. This will allow us to find the nearest road node and connect them with realistic paths. We'll use osmnx and networkx to accomplish this.
  • Export the paths and final list of latitudes/longitudes to a GPX file.

How does it look. We actually plotted our run above using the Folium library. You can juxtapose the SVG image below to our GPS overlayed image.

juxtaposed_image

#####################################
# extract points from SVG image
# !pip install svgpathtools # likely need to run to get package
from svgpathtools import svg2paths

# Load your SVG file
paths, attributes = svg2paths("gpsartproto/Pi-symbol.svg")
sampled_points = []
for path in paths:
    for t in np.linspace(0, 1, 100):  # adjust 100 to change resolution
        pt = path.point(t)
        sampled_points.append((pt.real, -pt.imag))
# RUN CODE from normalize_svg function

# plot svg sampled points to confirm
plt.plot(sampled_points[:, 0], sampled_points[:, 1], 'k')  # Flip Y for visual correctness
plt.gca().set_aspect('equal')
plt.title("pi Shape from SVG")
plt.savefig(f'svg_shape_sampled_points.png')
plt.show()

#####################################
# transpose SVG image points onto a map
# !pip install osmnx #likely need to run to get package
import math, pyproj
import numpy as np, shapely.geometry as geom, shapely.affinity as affinity
import shapely.ops as ops
import osmnx as ox, networkx as nx, gpxpy.gpx

CENTER_LL = (48.87075211275222, 2.345818599425802)   # lat, lon in Paris
RADIUS    = 1200      # meters 
SCALE_M   = 2.4       # meters per *pixel* unit

# Build and project run network ───────────────────────────
G_latlon = ox.graph_from_point(CENTER_LL, dist=RADIUS, network_type="walk")
G = ox.project_graph(G_latlon)
crs_proj = G.graph["crs"]

# Helper to convert lat/lon into projected meters
to_proj  = pyproj.Transformer.from_crs("EPSG:4326", crs_proj, always_xy=True).transform
anchor_x, anchor_y = to_proj(CENTER_LL[1], CENTER_LL[0])

# build svg pi in the same CRS
raw_xy  = sampled_points * SCALE_M
shape_m = geom.LineString(raw_xy)

# center the shape on the anchor point
centroid = shape_m.centroid
shape_m  = affinity.translate(
    shape_m,
    xoff=anchor_x - centroid.x,
    yoff=anchor_y - centroid.y
)

# Snap every vertex to the nearest street node
def nearest_node(x, y):
    return ox.distance.nearest_nodes(G, x, y)

coords   = list(shape_m.coords)
nodes    = [nearest_node(x, y) for x, y in coords]

# Shortest-path stitching between consecutive snapped nodes
route_nodes = []
for u, v in zip(nodes[:-1], nodes[1:]):
    if u == v: # identical then skip
        continue
    seg = nx.shortest_path(G, u, v, weight="length")
    route_nodes.extend(seg[:-1])
route_nodes.append(nodes[-1])

# Back to lat/lon for GPX export
G_ll  = ox.project_graph(G, to_latlong=True)
track = [(G_ll.nodes[n]["y"], G_ll.nodes[n]["x"]) for n in route_nodes]

# write GPX file
# !pip install gpxpy # likely need to run to get package 
import gpxpy
gpx = gpxpy.gpx.GPX()
trk = gpxpy.gpx.GPXTrack(); gpx.tracks.append(trk)
seg = gpxpy.gpx.GPXTrackSegment(); trk.segments.append(seg)
for lat, lon in track:
    seg.points.append(gpxpy.gpx.GPXTrackPoint(latitude=lat, longitude=lon))

with open("shape_run_paris2.gpx", "w") as f:
    f.write(gpx.to_xml())

FIN!

Congratulations on completing the Final Exam and the semester!