import numpy as np import pandas as pd import requests from sklearn.cluster import KMeans def cluster_and_optimize(df, centroids, end, time_diff=0.25, max_time=24, n=2): """ Takes a dataframe of gps coordinates, a list of centroids, and an end point and returns a dataframe with a cluster :param df: a dataframe of gps coordinates :param centroids: a list of centroids :param end: the end point of the trip :param time_diff: the maximum time difference between the longest trip and the average trip :param max_time: the maximum time of the trip :param n: the number of routes to create :return: a dataframe with a cluster column """ # Create a new column with normalized gps coordinates and centroids df["normalized_gps"], norm_centroids = __normalize_gps( df["gps"].values.tolist(), centroids ) # Cluster the coordinates kmeans = KMeans(n_clusters=len(norm_centroids), init=norm_centroids) kmeans.fit(df["normalized_gps"].values.tolist()) df["cluster"] = kmeans.labels_ routes = [] starts = [] for i in range(len(centroids)): routes.append(df[df["cluster"] == i]["gps"].values.tolist()) starts.append(list_to_string([centroids[i]])) routes = __minimize_route_time_diff(routes, starts, end, time_diff, n) # Remove waypoints from the longest route until the trip time is less than the max time for i in range(len(routes)): routes[i] = __remove_longest_waypoints(routes[i], starts[i], end, max_time) df.loc[df["gps"].astype(str).isin(map(str, routes[i])), "cluster"] = i return df, routes def list_to_string(list_of_lists): """ Takes a list of lists and returns a string of the list of lists :param list_of_lists: a list of lists :return: a string of the list of lists """ string = "" for i in list_of_lists: string += str(i[1]) + "," + str(i[0]) + ";" return string def create_json_df(coordinate_string, start, end): """ Takes a string of coordinates and returns a dataframe of the coordinates in order of the waypoint index :param coordinate_string: a string of coordinates :param start: the start point of the trip :param end: the end point of the trip :return: a dataframe of the coordinates in order of the waypoint index """ coordinates = requests.get( "http://acetyl.net:5000/trip/v1/bike/" + start + coordinate_string + end + "?roundtrip=false&source=first&destination=last" ) coordinates = coordinates.json() # Create a dataframe from the JSON df = pd.DataFrame(coordinates["waypoints"]) # Separate the location column into lon and lat columns df["lat"] = df["location"].apply(lambda x: x[0]) df["lon"] = df["location"].apply(lambda x: x[1]) df["waypoint_index"] = df["waypoint_index"].astype(int) # Map out the waypoints in order of the waypoint index df = df.sort_values(by=["waypoint_index"]) return df def get_trip_time(coordinate_string, num_waypoints, start, end, time_per_waypoint=90): """ Takes a string of coordinates and returns the trip time in hours :param coordinate_string: a string of coordinates :param num_waypoints: the number of waypoints :param start: the start point of the trip :param end: the end point of the trip :param time_per_waypoint: the time per waypoint in seconds :return: the trip time in hours """ coordinates = requests.get( "http://acetyl.net:5000/trip/v1/bike/" + start + coordinate_string + end + "?roundtrip=false&source=first&destination=last" ) coordinates = coordinates.json() travel_time_seconds = int(coordinates["trips"][0]["duration"]) waypoint_time_seconds = num_waypoints * time_per_waypoint total_time_hours = (travel_time_seconds + waypoint_time_seconds) / 3600 return total_time_hours def __minimize_route_time_diff(routes, starts, end, time_diff, n): """ Takes a list of lists of coordinates, a list of start points, an end point, a time difference, and a number of routes :param routes: the list of lists of coordinates :param starts: the list of start points :param end: the end point :param time_diff: the time difference :param n: the number of routes :return: a list of lists of coordinates """ times = [] for i, route in enumerate(routes): times.append(get_trip_time(list_to_string(route), len(route), starts[i], end)) # Find the average trip time average_time = np.mean(times) # Sort the trip times sorted_indices = np.argsort(times) # Find the difference between the longest trip time and the average trip time time_difference = times[sorted_indices[-1]] - average_time # If the difference is greater than the time difference, move a coordinate from the longest route to the shortest route if time_difference > time_diff: # Move a coordinate from the longest route to the shortest route closest_coordinate = __find_closest_coordinate( routes[sorted_indices[-1]], __mean_center(routes[sorted_indices[0]]) ) routes[sorted_indices[0]].append(closest_coordinate) routes[sorted_indices[-1]].remove(closest_coordinate) # Recursively minimize the time difference between the routes return __minimize_route_time_diff(routes, starts, end, time_diff, n) # If the difference of the longest trip time from average is less than the time difference, return the routes return routes def __remove_longest_waypoints(route_coordinates, start, end, max_time): """ Takes a list of coordinates, a start point, an end point, and a maximum time and returns a list of coordinates :param route_coordinates: the list of coordinates :param start: the start point :param end: the end point :param max_time: the maximum time :return: a list of coordinates """ # Find the trip time for the route route_time = get_trip_time( list_to_string(route_coordinates), len(route_coordinates), start, end ) # If the trip time is greater than the max time, remove the waypoint with the longest distance from the mean if route_time > max_time: route_mean = __mean_center(route_coordinates) furthest_coordinate = __find_farthest_coordinate(route_coordinates, route_mean) route_coordinates.remove(furthest_coordinate) return __remove_longest_waypoints(route_coordinates, start, end, max_time) return route_coordinates def __normalize_gps(coordinates, centroids): """ Takes a list of coordinates and a list of centroids and returns a list of normalized coordinates and a list of normalized centroids :param coordinates: the list of coordinates :param centroids: the list of centroids :return: the list of normalized coordinates and the list of normalized centroids """ # Create a list of latitudes and longitudes latitudes = [i[0] for i in coordinates] longitudes = [i[1] for i in coordinates] # Find the minimum and maximum latitudes and longitudes min_lat = min(latitudes) max_lat = max(latitudes) min_lon = min(longitudes) max_lon = max(longitudes) # Normalize the coordinates and centroids using min-max normalization normalized_coordinates = [] normalized_centroids = [] for i in coordinates: normalized_coordinates.append( [ __min_max_normalize(i[0], min_lat, max_lat), __min_max_normalize(i[1], min_lon, max_lon), ] ) for i in centroids: normalized_centroids.append( [ __min_max_normalize(i[0], min_lat, max_lat), __min_max_normalize(i[1], min_lon, max_lon), ] ) return normalized_coordinates, normalized_centroids def __min_max_normalize(value, min_value, max_value): """ Takes a value, a minimum value, and a maximum value and returns the normalized value :param value: the value :param min_value: the minimum value :param max_value: the maximum value :return: the normalized value """ return (value - min_value) / (max_value - min_value) def __find_closest_coordinate(coordinates, centroid): """ Takes a list of coordinates and a centroid and returns the coordinate in the list that is closest to the centroid :param coordinates: the list of coordinates :param centroid: the centroid :return: the coordinate in the list that is closest to the centroid """ closest_coordinate = coordinates[0] closest_coordinate_distance = __distance(closest_coordinate, centroid) for coordinate in coordinates: if __distance(coordinate, centroid) < closest_coordinate_distance: closest_coordinate = coordinate closest_coordinate_distance = __distance(coordinate, centroid) return closest_coordinate def __find_farthest_coordinate(coordinates, centroid): """ Takes a list of coordinates and a centroid and returns the coordinate in the list that is farthest from the centroid :param coordinates: the list of coordinates :param centroid: the centroid :return: the coordinate in the list that is farthest from the centroid """ farthest_coordinate = coordinates[0] farthest_coordinate_distance = __distance(farthest_coordinate, centroid) for coordinate in coordinates: if __distance(coordinate, centroid) > farthest_coordinate_distance: farthest_coordinate = coordinate farthest_coordinate_distance = __distance(coordinate, centroid) return farthest_coordinate def __mean_center(coordinates): """ Takes a list of coordinates and returns the mean center of the coordinates :param coordinates: the list of coordinates :return: the mean center of the coordinates """ return [ sum([i[0] for i in coordinates]) / len(coordinates), sum([i[1] for i in coordinates]) / len(coordinates), ] def __distance(coordinate1, coordinate2): """ Takes two coordinates and returns the distance between them :param coordinate1: the first coordinate :param coordinate2: the second coordinate :return: the distance between the two coordinates """ return ( (coordinate1[0] - coordinate2[0]) ** 2 + (coordinate1[1] - coordinate2[1]) ** 2 ) ** 0.5