#!/usr/bin/env python3 import datetime, requests, json, pytz, sys from geopy.geocoders import Nominatim, GeoNames def get_lat_long(location): # Converts a location into latitude and longitude geolocator = Nominatim(user_agent="pywttr") location = geolocator.geocode(location) return location.latitude, location.longitude def get_time_zone(latitude, longitude): #TODO pass def get_grid_data(latitude, longitude): # Returns id,x,y for a given latitude and longitude grid_data = json.loads(requests.get(f"https://api.weather.gov/points/{latitude},{longitude}").text) grid_id = grid_data["properties"]["gridId"] grid_x = grid_data["properties"]["gridX"] grid_y = grid_data["properties"]["gridY"] return {"grid_id": grid_id, "grid_x": grid_x, "grid_y":grid_y} def get_raw_data(grid_id, grid_x, grid_y): raw_data = json.loads(requests.get(f"https://api.weather.gov/gridpoints/{grid_id}/{grid_x},{grid_y}").text) return raw_data def get_raw_forecast(grid_id, grid_x, grid_y): raw_data = json.loads(requests.get(f"https://api.weather.gov/gridpoints/{grid_id}/{grid_x},{grid_y}/forecast", user_agent="my-test-app").text) return raw_data def get_current_rounded_time(): # Gets current time rounded down to the hour tz = pytz.timezone("America/New_York") #temp cur_time = datetime.datetime.now(tz=tz) cur_time_rounded = cur_time.replace(second=0, microsecond=0, minute=0, hour=cur_time.hour) return cur_time_rounded def set_timezone(values): # Takes a list of weather data values # and converts all times to proper timezone ret = [] tz = pytz.timezone("America/New_York") #temp for val in values: val["time"] = val["time"].astimezone(tz) ret.append(val) return ret def make_current(values): # Takes a list of weather data values # and removes items from before the current time # (to the nearest hour) ret = [] cur_time_rounded = get_current_rounded_time() for val in values: if val["time"] >= cur_time_rounded: ret.append(val) return ret def fill_gaps(values): # Takes a list of weather data values # and fills gaps left by duration periods of longer # than 1 hour ret = [] for val in values: ret.append(val) duration_hours = int((val["duration"].seconds / 3600) + (val["duration"].days * 24)) if duration_hours > 1: for i in range(1, duration_hours): copy = val.copy() copy["time"] = val["time"] + datetime.timedelta(hours=i) copy["duration"] = datetime.timedelta(hours=1) ret.append(copy) return ret def normalize(values): values = set_timezone(values) values = make_current(values) values = fill_gaps(values) return values def celcius_to_fahrenheit(celcius): fahrenheit = int(celcius * 9/5 + 32) return fahrenheit def parse_duration(duration_str): #Parses time duration string and returns timedelta duration_str = duration_str[1:] # strip off leading P period_str, time_str = duration_str.split('T') if len(period_str) > 0: days = int(period_str[0]) else: days = 0 hours = int(time_str[0:len(time_str)-1]) delta = datetime.timedelta(hours=hours, days=days) return delta def get_daily_highs(raw_data): daily_highs_raw = raw_data["properties"]["maxTemperature"]["values"] daily_highs = [] for high in daily_highs_raw: high_celc = high["value"] high_fahr = celcius_to_fahrenheit(high_celc) time_str, duration_str = high["validTime"].split('/') time = datetime.datetime.strptime(time_str, "%Y-%m-%dT%H:%M:%S%z") duration = parse_duration(duration_str) daily_highs.append({"time":time,"duration":duration,"high_celc":high_celc,"high_fahr":high_fahr}) return daily_highs def get_daily_lows(raw_data): daily_lows_raw = raw_data["properties"]["minTemperature"]["values"] daily_lows = [] for low in daily_lows_raw: low_celc = low["value"] low_fahr = celcius_to_fahrenheit(low_celc) time_str, duration_str = low["validTime"].split('/') time = datetime.datetime.strptime(time_str, "%Y-%m-%dT%H:%M:%S%z") duration = parse_duration(duration_str) daily_lows.append({"time":time,"duration":duration,"low_celc":low_celc,"low_fahr":low_far}) return daily_lows def get_temperature(raw_data): raw_values = raw_data["properties"]["temperature"]["values"] ret = [] for val in raw_values: val_celc = round(val["value"]) val_fahr = round(celcius_to_fahrenheit(val_celc)) time_str, duration_str = val["validTime"].split('/') time = datetime.datetime.strptime(time_str, "%Y-%m-%dT%H:%M:%S%z") duration = parse_duration(duration_str) ret.append({"time":time,"duration":duration,"value_celc":val_celc,"value_fahr":val_fahr}) return normalize(ret) def get_apparent_temperature(raw_data): raw_values = raw_data["properties"]["apparentTemperature"]["values"] ret = [] for val in raw_values: val_celc = round(val["value"]) val_fahr = round(celcius_to_fahrenheit(val_celc)) time_str, duration_str = val["validTime"].split('/') time = datetime.datetime.strptime(time_str, "%Y-%m-%dT%H:%M:%S%z") duration = parse_duration(duration_str) ret.append({"time":time,"duration":duration,"value_celc":val_celc,"value_fahr":val_far}) return normalize(ret) def get_humidity(raw_data): raw_values = raw_data["properties"]["relativeHumidity"]["values"] ret = [] for val in raw_values: time_str, duration_str = val["validTime"].split('/') time = datetime.datetime.strptime(time_str, "%Y-%m-%dT%H:%M:%S%z") duration = parse_duration(duration_str) ret.append({"time":time,"duration":duration,"value":round(val["value"])}) return normalize(ret) def get_wind_chill(raw_data): raw_values = raw_data["properties"]["windChill"]["values"] ret = [] for val in raw_values: val_celc = round(val["value"]) val_fahr = round(celcius_to_fahrenheit(val_celc)) time_str, duration_str = val["validTime"].split('/') time = datetime.datetime.strptime(time_str, "%Y-%m-%dT%H:%M:%S%z") duration = parse_duration(duration_str) ret.append({"time":time,"duration":duration,"value_celc":val_celc,"value_fahr":val_fahr}) return normalize(ret) def get_wind_speed(raw_data): raw_values = raw_data["properties"]["windSpeed"]["values"] ret = [] for val in raw_values: time_str, duration_str = val["validTime"].split('/') time = datetime.datetime.strptime(time_str, "%Y-%m-%dT%H:%M:%S%z") duration = parse_duration(duration_str) ret.append({"time":time,"duration":duration,"value":round(val["value"])}) return normalize(ret) def get_wind_gust(raw_data): raw_values = raw_data["properties"]["windGust"]["values"] ret = [] for val in raw_values: time_str, duration_str = val["validTime"].split('/') time = datetime.datetime.strptime(time_str, "%Y-%m-%dT%H:%M:%S%z") duration = parse_duration(duration_str) ret.append({"time":time,"duration":duration,"value":round(val["value"])}) return normalize(ret) def get_precip_chance(raw_data): raw_values = raw_data["properties"]["probabilityOfPrecipitation"]["values"] ret = [] for val in raw_values: time_str, duration_str = val["validTime"].split('/') time = datetime.datetime.strptime(time_str, "%Y-%m-%dT%H:%M:%S%z") duration = parse_duration(duration_str) ret.append({"time":time,"duration":duration,"value":round(val["value"])}) return normalize(ret) def get_precip_amount(raw_data): raw_values = raw_data["properties"]["quantitativePrecipitation"]["values"] ret = [] for val in raw_values: time_str, duration_str = val["validTime"].split('/') time = datetime.datetime.strptime(time_str, "%Y-%m-%dT%H:%M:%S%z") duration = parse_duration(duration_str) ret.append({"time":time,"duration":duration,"value":round(val["value"])}) return normalize(ret) def get_snowfall_amount(raw_data): raw_values = raw_data["properties"]["snowfallAmount"]["values"] ret = [] for val in raw_values: time_str, duration_str = val["validTime"].split('/') time = datetime.datetime.strptime(time_str, "%Y-%m-%dT%H:%M:%S%z") duration = parse_duration(duration_str) ret.append({"time":time,"duration":duration,"value":round(val["value"])}) return normalize(ret) def get_snow_level(raw_data): raw_values = raw_data["properties"]["snowLevel"]["values"] ret = [] for val in raw_values: time_str, duration_str = val["validTime"].split('/') time = datetime.datetime.strptime(time_str, "%Y-%m-%dT%H:%M:%S%z") duration = parse_duration(duration_str) ret.append({"time":time,"duration":duration,"value":round(val["value"])}) return normalize(ret) def get_visibility(raw_data): raw_values = raw_data["properties"]["visibility"]["values"] ret = [] for val in raw_values: time_str, duration_str = val["validTime"].split('/') time = datetime.datetime.strptime(time_str, "%Y-%m-%dT%H:%M:%S%z") duration = parse_duration(duration_str) ret.append({"time":time,"duration":duration,"value":round(val["value"])}) return normalize(ret) def get_wind_direction(raw_data): raw_values = raw_data["properties"]["windDirection"]["values"] ret = [] for val in raw_values: time_str, duration_str = val["validTime"].split('/') time = datetime.datetime.strptime(time_str, "%Y-%m-%dT%H:%M:%S%z") duration = parse_duration(duration_str) def degrees_to_cardinal(d): dirs = ['N', 'NNE', 'NE', 'ENE', 'E', 'ESE', 'SE', 'SSE', 'S', 'SSW', 'SW', 'WSW', 'W', 'WNW', 'NW', 'NNW'] ix = round(d / (360. / len(dirs))) return dirs[ix % len(dirs)] direction_str = degrees_to_cardinal(val["value"]) ret.append({"time":time,"duration":duration,"value":direction_str}) return normalize(ret) def get_dewpoint(raw_data): raw_values = raw_data["properties"]["dewpoint"]["values"] ret = [] for val in raw_values: val_celc = round(val["value"]) val_fahr = round(celcius_to_fahrenheit(val_celc)) time_str, duration_str = val["validTime"].split('/') time = datetime.datetime.strptime(time_str, "%Y-%m-%dT%H:%M:%S%z") duration = parse_duration(duration_str) ret.append({"time":time,"duration":duration,"value_celc":val_celc,"value_fahr":val_fahr}) return normalize(ret) def get_hourly_data(raw_data, end_time): temps = get_temperature(raw_data) humidity = get_humidity(raw_data) precip_chance = get_precip_chance(raw_data) precip_amount = get_precip_amount(raw_data) wind_speed = get_wind_speed(raw_data) wind_direction = get_wind_direction(raw_data) ret = [] i = 0 while i < len(temps) and temps[i]["time"] < end_time: if i >= len(temps): temps.append({"value":"N/A"}) if i >= len(humidity): humidity.append({"value":"N/A"}) if i >= len(precip_chance): precip_chance.append({"value":"N/A"}) if i >= len(precip_amount): precip_amount.append({"value":"N/A"}) if i >= len(wind_speed): wind_speed.append({"value":"N/A"}) if i >= len(wind_direction): wind_direction.append({"value":"N/A"}) val_dict = { "time": temps[i]["time"], "temp": temps[i]["value_fahr"], "humidity": humidity[i]["value"], "precip_chance": precip_chance[i]["value"], "precip_amount": precip_amount[i]["value"], "wind_speed": wind_speed[i]["value"], "wind_direction": wind_direction[i]["value"] } ret.append(val_dict) i+=1 return ret