Compare commits
2 Commits
5aafac2025
...
5b0a0768cb
Author | SHA1 | Date | |
---|---|---|---|
|
5b0a0768cb | ||
|
43b85d5214 |
151
src/batch_distance.py
Executable file
151
src/batch_distance.py
Executable file
@ -0,0 +1,151 @@
|
||||
# Batch_Distance.py: Batch process a bunch of distance measurements.
|
||||
|
||||
import sys
|
||||
import argparse
|
||||
import csv
|
||||
import json
|
||||
import math
|
||||
import configparser
|
||||
import urllib.parse
|
||||
import urllib.request
|
||||
|
||||
|
||||
class GeocodingError(RuntimeError):
|
||||
pass
|
||||
|
||||
|
||||
def geocode_google(config, address):
|
||||
apikey = config['google']['apikey']
|
||||
if not apikey:
|
||||
raise GeocodingError("Google API key not specified")
|
||||
query = { 'key': apikey, 'address': address, 'region': 'us'}
|
||||
url = 'https://maps.googleapis.com/maps/api/geocode/json?' + \
|
||||
urllib.parse.urlencode(query, quote_via=urllib.parse.quote)
|
||||
with urllib.request.urlopen(url) as response:
|
||||
if response.status == 200:
|
||||
apireturn = json.loads(response.read())
|
||||
stat = apireturn['status']
|
||||
if stat == 'OK':
|
||||
results = apireturn['results']
|
||||
if len(results) > 1:
|
||||
raise GeocodingError(f"Google API returned ambiguous results (total count {len(results)})")
|
||||
coords = results[0]['geometry']['location']
|
||||
return coords['lat'], coords['lng']
|
||||
elif stat == 'ZERO_RESULTS':
|
||||
return None
|
||||
else:
|
||||
raise GeocodingError(f"Google API returns status of {stat}")
|
||||
else:
|
||||
raise GeocodingError(f"Google API returns {response.status} HTTP status code")
|
||||
|
||||
|
||||
OFFICE_LOCATION = (40.0187905, -105.2764775) # office location - 1433 Pearl Street, Boulder, CO
|
||||
RADIUS = 6371.0 * 1000.0 # Earth radius in meters
|
||||
METERS_PER_MILE = 1852.0 # number of meters per mile
|
||||
|
||||
|
||||
def distance_miles(point1, point2):
|
||||
Φ1 = math.radians(point1[0])
|
||||
Φ2 = math.radians(point2[0])
|
||||
ΔΦ = math.radians(point2[0] - point1[0])
|
||||
Δλ = math.radians(point2[1] - point1[1])
|
||||
a = math.sin(ΔΦ / 2.0) * math.sin(ΔΦ / 2.0) + math.cos(Φ1) * math.cos(Φ2) * math.sin(Δλ / 2.0) * math.sin(Δλ / 2.0)
|
||||
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1.0 - a))
|
||||
return RADIUS * c / METERS_PER_MILE
|
||||
|
||||
|
||||
THRESHOLD_RADIUS = 15.0 # number of miles in "acceptable" threshold
|
||||
ERROR_BAR_WIDTH = 0.5 # number of miles in "error bar" surrounding threshold
|
||||
|
||||
|
||||
def classify_distance(dist):
|
||||
if dist > (THRESHOLD_RADIUS + ERROR_BAR_WIDTH / 2):
|
||||
return 'RED'
|
||||
if dist >= (THRESHOLD_RADIUS - ERROR_BAR_WIDTH / 2):
|
||||
return 'YELLOW'
|
||||
return 'GREEN'
|
||||
|
||||
|
||||
def transit_time_google(config, org, dest):
|
||||
apikey = config['google']['apikey']
|
||||
if not apikey:
|
||||
raise GeocodingError("Google API key not specified")
|
||||
str_origin = f"{org[0]},{org[1]}"
|
||||
str_dest = f"{dest[0]},{dest[1]}"
|
||||
query = {'key': apikey, 'origin': str_origin, 'destination': str_dest, 'mode': 'driving', 'region': 'us'}
|
||||
url = 'https://maps.googleapis.com/maps/api/directions/json?' + \
|
||||
urllib.parse.urlencode(query, quote_via=urllib.parse.quote)
|
||||
with urllib.request.urlopen(url) as response:
|
||||
if response.status == 200:
|
||||
apireturn = json.loads(response.read())
|
||||
stat = apireturn['status']
|
||||
if stat == 'OK':
|
||||
best_time = None
|
||||
best_summary = None
|
||||
for route in apireturn['routes']:
|
||||
summary = route['summary']
|
||||
current_time = 0
|
||||
for leg in route['legs']:
|
||||
current_time += leg['duration']['value']
|
||||
if best_time is None or current_time < best_time:
|
||||
best_time = current_time
|
||||
best_summary = summary
|
||||
if best_time is None:
|
||||
return None
|
||||
return best_time, best_summary
|
||||
elif stat == 'ZERO_RESULTS':
|
||||
return None
|
||||
else:
|
||||
raise GeocodingError(f"Google API returns status of {stat}")
|
||||
else:
|
||||
raise GeocodingError(f"Google API returns {response.status} HTTP status code")
|
||||
|
||||
|
||||
def stringize_seconds(total_secs):
|
||||
secs = total_secs % 60
|
||||
remainder = total_secs // 60
|
||||
if remainder == 0:
|
||||
return f"{secs}s"
|
||||
mins = remainder % 60
|
||||
hours = remainder // 60
|
||||
if hours == 0:
|
||||
return f"{mins}m{secs}s"
|
||||
return f"{hours}h{mins}m{secs}s"
|
||||
|
||||
|
||||
cmdline_parser = argparse.ArgumentParser()
|
||||
cmdline_parser.add_argument('input', help='The input file containing locations to be checked.')
|
||||
cmdline_parser.add_argument('output', nargs='?', help='The output file name that will receive the processed data.')
|
||||
cmdline_parser.add_argument('-C', '--config', default='geoapi.ini', help='The geocoding API configuration file')
|
||||
|
||||
|
||||
def main(args):
|
||||
opts = cmdline_parser.parse_args(args)
|
||||
config = configparser.ConfigParser()
|
||||
config.read(opts.config)
|
||||
output_name = opts.output
|
||||
if not output_name:
|
||||
output_name = opts.input + '.out'
|
||||
with open(opts.input, newline='') as input_file:
|
||||
dialect = csv.Sniffer().sniff(input_file.read(1024))
|
||||
input_file.seek(0)
|
||||
reader = csv.reader(input_file, dialect)
|
||||
with open(output_name, mode='w') as output_file:
|
||||
writer = csv.writer(output_file, dialect)
|
||||
for in_row in reader:
|
||||
coords = geocode_google(config, in_row[1])
|
||||
if coords:
|
||||
dist = distance_miles(OFFICE_LOCATION, coords)
|
||||
out_row = [in_row[0], in_row[1], coords[0], coords[1], dist, classify_distance(dist)]
|
||||
transit_time = transit_time_google(config, coords, OFFICE_LOCATION)
|
||||
if transit_time:
|
||||
out_row += [transit_time[0], stringize_seconds(transit_time[0]), transit_time[1]]
|
||||
else:
|
||||
out_row += ['*** Unable to calculate transit time']
|
||||
else:
|
||||
out_row = [in_row[0], in_row[1], '*** Unable to locate']
|
||||
writer.writerow(out_row)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
sys.exit(main(sys.argv[1:]))
|
53
src/dcalc.py
53
src/dcalc.py
@ -53,6 +53,53 @@ def distance_miles(point1, point2):
|
||||
return RADIUS * c / METERS_PER_MILE
|
||||
|
||||
|
||||
def transit_time_google(config, org, dest):
|
||||
apikey = config['google']['apikey']
|
||||
if not apikey:
|
||||
raise GeocodingError("Google API key not specified")
|
||||
str_origin = f"{org[0]},{org[1]}"
|
||||
str_dest = f"{dest[0]},{dest[1]}"
|
||||
query = {'key': apikey, 'origin': str_origin, 'destination': str_dest, 'mode': 'driving', 'region': 'us'}
|
||||
url = 'https://maps.googleapis.com/maps/api/directions/json?' + \
|
||||
urllib.parse.urlencode(query, quote_via=urllib.parse.quote)
|
||||
with urllib.request.urlopen(url) as response:
|
||||
if response.status == 200:
|
||||
apireturn = json.loads(response.read())
|
||||
stat = apireturn['status']
|
||||
if stat == 'OK':
|
||||
best_time = None
|
||||
best_summary = None
|
||||
for route in apireturn['routes']:
|
||||
summary = route['summary']
|
||||
current_time = 0
|
||||
for leg in route['legs']:
|
||||
current_time += leg['duration']['value']
|
||||
if best_time is None or current_time < best_time:
|
||||
best_time = current_time
|
||||
best_summary = summary
|
||||
if best_time is None:
|
||||
return None
|
||||
return best_time, best_summary
|
||||
elif stat == 'ZERO_RESULTS':
|
||||
return None
|
||||
else:
|
||||
raise GeocodingError(f"Google API returns status of {stat}")
|
||||
else:
|
||||
raise GeocodingError(f"Google API returns {response.status} HTTP status code")
|
||||
|
||||
|
||||
def stringize_seconds(total_secs):
|
||||
secs = total_secs % 60
|
||||
remainder = total_secs // 60
|
||||
if remainder == 0:
|
||||
return f"{secs}s"
|
||||
mins = remainder % 60
|
||||
hours = remainder // 60
|
||||
if hours == 0:
|
||||
return f"{mins}m{secs}s"
|
||||
return f"{hours}h{mins}m{secs}s"
|
||||
|
||||
|
||||
cmdline_parser = argparse.ArgumentParser()
|
||||
cmdline_parser.add_argument('address', nargs='+', help='The address to be calculated')
|
||||
cmdline_parser.add_argument('-C', '--config', default='geoapi.ini', help='The geocoding API configuration file')
|
||||
@ -69,6 +116,12 @@ def main(args):
|
||||
print(f"Coordinates: Latitude {coords[0]}, longitude {coords[1]}")
|
||||
dist = distance_miles(OFFICE_LOCATION, coords)
|
||||
print(f"Distance from Boulder office: {dist} miles")
|
||||
transit_time = transit_time_google(config, coords, OFFICE_LOCATION)
|
||||
if transit_time:
|
||||
str_time = stringize_seconds(transit_time[0])
|
||||
print(f"Estimated transit time: {str_time} - route: {transit_time[1]} ({transit_time[0]} seconds)")
|
||||
else:
|
||||
print("Unable to calculate transit time.")
|
||||
else:
|
||||
print("Location was not found.")
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user