Create a route optimization algorithm with zero costs using google's OR-tools and OSRM Part 1
This is a tutorial for an algorithm that has riped over the years of experience in doing algorithms and websites for carpooling, Uber like service, Freight routing, and many other applications.
I will be stating the current algorithm and i will discuss my decisions on certain code implementations as well as some of the previous decisions that i took and proved to be wrong, slow or error prone.
This algorithm is capable of optimizing for thousands of locations, and made to be highly customizable.
Features of the algorithm are:
- Reading data from and exporting data to an excel sheet or a Database.
- Multiple time windows per location.
- Multiple vehicle types, capacities, buffers, depots.
- Custom optimization parameters like distance, time, fuel consumption, etc.
- Pickup and dropoff.
- Custom loading and unloading time per stop.
- Custom service time at certain locations.
- Custom max idle time waiting for the next stop's time window.
- Custom distributed workload amongst all vehicles.
- Custom vehicle start and End time.
- Max Distance, stops and time per vehicle.
Graphical interface:
- Can be done using google maps or any of the many free services.
- Routes for each vehicle can be colored differently and markers's icons can be changed as well and can be dynamic, bouncy for instance.
- An algorithm to get over google's limitations over the number of waypoints per route which can cost thousands for free.
Sample of the route optimization algorithm Sample of the optimized routes using leaflet |
Sample of the optimized routes using leaflet |
In case you want to create a Database, read from and write to it, checkout my other tutorial first.
First we create a python file named initializers that holds all our constants to keep our main code clean.
initializers.py
import logging
from logging.handlers
from logging.handlers
import RotatingFileHandler
#------------------------------------------ database -----------------------------------------------------------
DB_READ_INTERVAL = 10 # secs
ZERO_LIMIT = 60 * 60 # secs
UNABLE_LIMIT = 30 * 60 # secs
REQUEST_LIMIT = 2000
HOSTNAME = '127.0.0.1'
PASSWORD = 'Ilovealgorithms'
DATABASE = 'carpool'
USERNAME = 'Algoguy'
#-----------------------------------------------------------------------------------------------------------------
# --------------------------------------- algorithm ------------------------------------------------------------
OSRM_URL = 'http://127.0.0.1:5000'
OSM_READ_INTERVAL = 5 # secsTIME_FACTOR = 60 # mins
IGNORE_TIME = True
KM_LIMIT = 500 # km
#------------------------------------------------------------------------------------------------------------------
#--------------------------------------------- Algorithm logging --------------------------------------------
LOGFILE = 'Algorithm_log.log'
MAX_LOG_SIZE = 5 # MB
MAX_LOG_NO = 20
log_formatter = logging.Formatter('%(asctime)s, func_name: %(funcName)s %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s','%Y-%m-%d -- %H:%M:%S')
app_handler = RotatingFileHandler(LOGFILE,
mode='a',
maxBytes = MAX_LOG_SIZE*1024*1024,
backupCount = MAX_LOG_NO,
encoding = None,
delay = 0)
app_handler.setFormatter(log_formatter)
app_handler.setLevel(logging.INFO)
app_log = logging.getLogger('algorithm')
app_log.setLevel(logging.INFO)
app_log.addHandler(app_handler)
#-----------------------------------------------------------------------------------------------------------------
#------------------------------------------------ Monitor -----------------------------------------------------
MEMORY_USAGE_LIMIT = 90 # Percent
CPU_USAGE_LIMIT = 90 # Percent
#---------------------------------------------------------------------------------------------------------------
#----------------------------------------------- Monitor logging -------------------------------------------
LOGFILE = 'Monitor_log.log'
MAX_LOG_SIZE = 5 # MB
MAX_LOG_NO = 20
Log_formatter = logging.Formatter('%(asctime)s, func_name: %(funcName)s %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s','%Y-%m-%d -- %H:%M:%S')
my_handler = RotatingFileHandler(LOGFILE,
mode='a',
maxBytes = MAX_LOG_SIZE*1024*1024,
backupCount = MAX_LOG_NO,
encoding = None,
delay = 0)
my_handler.setFormatter(Log_formatter)
my_handler.setLevel(logging.INFO)
monitor_log = logging.getLogger('monitor')
monitor_log.setLevel(logging.INFO)
monitor_log.addHandler(my_handler)
#----------------------------------------------------------------------------------------------------------
#----------------------------------------------- Send_mail ---------------------------------------------
RETRY_SEND_LIMIT = 10
filenames = ['Algorithm_log.log',
'Monitor_log.log',
'serveme_log.log']
logging.Formatter('%(asctime)s, func_name: %(funcName)s %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s','%Y-%m-%d -- %H:%M:%S')
initializes the format of our output to the log file so each line starts with the date and time, the function name within which the logging function was called, then the severity(level) of the message then the filename that contains the function and the line number.
First we start reading the data from the db and building our events list.
We have two ways to do it,
This method lets the db do all the hard work, and the db could crash or return weird results if the db is big.
app_log.info("Reading request table to prepare event_arr")
time_now = datetime.datetime.now().strftime("%Y-%m-%d") + ' 00:00:00'
time_future = (datetime.datetime.now() + datetime.timedelta(days = 1)).strftime("%Y-%m-%d") + ' 23:59:00'
Zero_time = False
while True:
try:
with psycopg2.connect(host=HOSTNAME,
user=USERNAME,
password=PASSWORD,
dbname=DATABASE) as conn:
with conn.cursor(name='my_cursor',
cursor_factory=psycopg2.extras.DictCursor) as cur: # create a named server-side cursor
cur.execute("SELECT offer.lat, offer.lon, offer.id, offer.capacity, request.lat, request.lon, request.id, request.trip_id, offer.event_id, event.event_time, event.lat, event.lon FROM ((offer INNER JOIN request ON request.event_id = offer.event_id)INNER JOIN event ON offer.event_id = event.id) WHERE event.lat > 51.2 AND event.lat < 51.8 AND event.lon > -0.62 AND event.lon < 0.38 AND event.event_time between '{}' and '{}';".format(time_now, time_future))
df = pd.DataFrame(np.array(cur.fetchall()))
if len(df) > 0:
df.columns = ['offer_lat', 'offer_lon', 'offer_acc_id', 'offer_capacity',
'request_lat', 'request_lon', 'request_acc_id',
'event_id', 'event_time', 'event_lat', 'event_lon']
Zero_time = False
else:
app_log.warning("returned 0 results")
if Zero_time == False:
Zero_time = time()
else:
if time() - Zero_time > ZERO_LIMIT:
Send_mail()
Zero_time = False
continue
except psycopg2.OperationalError as e:
if "violate" in str(e):
app_log.critical(e)
Send_mail()
sleep(DB_READ_INTERVAL)
exit()
app_log.error("warning: Unable to connect")
app_log.error("Retrying in %s Secs"%DB_READ_INTERVAL)
app_log.debug(str(e))
if Zero_time == False:
Zero_time = time()
else:
if time() - Zero_time > ZERO_LIMIT:
Send_mail()
Zero_time = False
sleep(DB_READ_INTERVAL)
else:
app_log.info("Done reading request table")
event_arr = []
for ID in np.unique(df['event_id']).tolist():
event_arr.append(df[df['event_id']==ID].reset_index(drop=True))
return event_arr
continue
What comes after the JOIN keyword is called the secondary table in some texts.
SELECT offer.lat, offer.lon, offer.id, offer.capacity, request.lat, request.lon, request.id, request.trip_id, offer.event_id, event.event_time, event.lat, event.lon FROM Those are the main tables and we specify which columns to return from each respectively after we get the INNER JOIN results with the secondary tables.
We don't need to query the db for all the days in one go since the future offers and requests will change anyways, and when the db grows, that will be a resources intensive query with no obvious benefit.
So we query between the time_now which is the current datetime.datetime.now().strftime("%Y-%m-%d") Year - month - day starting at midnight ' 00:00:00' and time_future which is one day later datetime.timedelta(days = 1) , we get the year - month - day as well but before midnight + ' 23:59:00'.
Zero_time is basically the time period where i get undesirable results from the db, let that be zero results or the db is not responding at all.
cur.fetchall() executes the query on the server and returns the result of the query.
np.array transforms the result into a numpy array.
pd.DataFrame transforms the numpy array into a pandas datafrom.
pandas has a function pd.read_sql which can read a table easily without all the fuss but when you need to execute certain queries, that function won't be helpful.
If there is any data returned if len(df) > 0: then we give column names to the pandas dataframe df.columns = [....] as data returned by the sql query respects the order of the SELECT method but it doesn't return the names, just data.
if no data is returned then we check if this is the first time if Zero_time == False: to start the timer else we check to see if more than ZERO_LIMIT time has passed, if True then it sends me an email Send_mail(), we will discuss that function later and restarts the time again Zero_time = False ,otherwise continue the querying db loop again.
All of the above functions lie within the try: block,
If an exception happens except psycopg2.OperationalError as e: which handles connection errors as the only thing that could happen here and not to be considered dangerous is a connection timeout.
I couldn't find good documentation for psycopg2 but sometimes if a violation happens, it can invoke an operationalerror so i look for the violate in the error returned so my code intentionally exits on that if "violate" in str(e): and gives me a critical error in the log.
If an exception happens then we do the Zero_time loop check again but now we sleep for a certain sleep(DB_READ_INTERVAL) instead of swarming the server with queries and making it slow.
If everything in the try: block works then we go into the else: block
np.unique(df['event_id']).tolist() returns a list ignoring any duplicates in df['event'_id'] , thats because the dataframe will hold duplicate event ids corresponding to respective requests and offers.
we loop over that list to generate an events array such that each array holds requests and offers with the same event_id
df['event_id']==ID will return an array of the same size but with all the rows with event_id == ID set to True and the others set to False.
df[df['event_id']==ID] will return only rows that have same event_id == ID.
reset_index(drop=True) will reset the original indices of the rows and reindex them starting from 0 so we can call them by index incrementally later.
The method above was method 1 and as you can see, it doesn't involve much coding, but it lets the db do all the work.
Next is method2
def read_events():
time_now = datetime.datetime.now().strftime("%Y-%m-%d") + ' 00:00:00'
time_future = (datetime.datetime.now() + datetime.timedelta(days = 1)).strftime("%Y-%m-%d") + ' 23:59:00'
Zero_time = False
while True:
app_log.info('Reading events to places')
try:
with psycopg2.connect(host=HOSTNAME,
user=USERNAME,
password=PASSWORD,
dbname=DATABASE) as conn:
with conn.cursor(name='my_cursor',cursor_factory=psycopg2.extras.DictCursor) as cur: # create a named server-side cursor
cur.execute("SELECT event.id, event.event_time, place.lat, place.lon FROM event WHERE event.lat > 51.2 AND event.lat < 51.8 AND event.lon > -0.62 AND event.lon < 0.38 AND event.event_time between '{}' and '{}';".format(time_now,time_future))
df = pd.DataFrame(np.array(cur.fetchall()))
if len(df) > 0:
Zero_time = False
df.columns = ['event_id','event_time', 'place_lat','place_lon']
app_log.info('found '+str(len(df))+' events')
else:
app_log.warning("returned 0 results for events to places")
if Zero_time == False:
Zero_time = time()
else:
if time() - Zero_time > ZERO_LIMIT:
Send_mail()
Zero_time = False
app_log.info("Retrying in %s Secs"%DB_READ_INTERVAL)
sleep(DB_READ_INTERVAL)
continue
except psycopg2.OperationalError as e:
if "violate" in str(e):
app_log.critical(e)
Send_mail()
sleep(DB_READ_INTERVAL)
continue
app_log.error("Unable to connect to db")
app_log.info("Retrying in %s Secs"%DB_READ_INTERVAL)
app_log.debug(str(e))
if Zero_time == False:
Zero_time = time()
else:
if time() - Zero_time > UNABLE_LIMIT:
Send_mail()
Zero_time = False
sleep(DB_READ_INTERVAL)
continue
else:
return df
Next we use those events to query for the offers and requests going to those events.
def read_offers(Events):
Zero_time = False
reading = time()
while True:
try:
Final_df = pd.DataFrame({'offer_lat': [],
'offer_lon': [],
'offer_acc_id': [],
'offer_capacity': [],
'event_id': []})
spliced_arr = splice_arr(Events)
for arr in spliced_arr:
statement = 'SELECT lat, lon, id, capacity, event_id FROM offer WHERE'
for event in arr:
statement += " event_id = '" + event + "' OR"
statement = statement[:-3]
with psycopg2.connect(host=HOSTNAME,
user=USERNAME,
password=PASSWORD,
dbname=DATABASE) as conn:
with conn.cursor(name='my_cursor', cursor_factory=psycopg2.extras.DictCursor) as cur:
cur.execute(statement)
df = pd.DataFrame(np.array(cur.fetchall()))
if len(df) > 0:
df.columns = ['offer_lat', 'offer_lon', 'offer_acc_id', 'offer_capacity', 'event_id']
Final_df = Final_df.append(df,ignore_index=True)
if len(Final_df) > 0:
app_log.info('Got '+str(len(Final_df))+ ' offers')
else:
app_log.warning("got 0 offers for the events\n Retrying in %s Secs"%DB_READ_INTERVAL)
if Zero_time == False:
Zero_time = time()
else:
if time() - Zero_time > ZERO_LIMIT:
Send_mail()
Zero_time = False
sleep(DB_READ_INTERVAL)
return False
except psycopg2.OperationalError as e:
if "violate" in str(e):
app_log.critical(e)
Send_mail()
sleep(DB_READ_INTERVAL)
continue
app_log.error("Unable to connect to get offer_to_event")
app_log.info("Retrying in %s Secs"%DB_READ_INTERVAL)
app_log.debug(str(e))
if Zero_time == False:
Zero_time = time()
else:
if time() - Zero_time > UNABLE_LIMIT:
Send_mail()
Zero_time = False
sleep(DB_READ_INTERVAL)
return False
else:
app_log.info('Time reading offers%i'%int(time()-reading))
app_log.info("Done reading offers going to events")
return Final_df
Instead of querying for offers going to all of the events in one go, we can splice them such that we query for a number of REQUEST_LIMIT events per query,Zero_time = False
reading = time()
while True:
try:
Final_df = pd.DataFrame({'offer_lat': [],
'offer_lon': [],
'offer_acc_id': [],
'offer_capacity': [],
'event_id': []})
spliced_arr = splice_arr(Events)
for arr in spliced_arr:
statement = 'SELECT lat, lon, id, capacity, event_id FROM offer WHERE'
for event in arr:
statement += " event_id = '" + event + "' OR"
statement = statement[:-3]
with psycopg2.connect(host=HOSTNAME,
user=USERNAME,
password=PASSWORD,
dbname=DATABASE) as conn:
with conn.cursor(name='my_cursor', cursor_factory=psycopg2.extras.DictCursor) as cur:
cur.execute(statement)
df = pd.DataFrame(np.array(cur.fetchall()))
if len(df) > 0:
df.columns = ['offer_lat', 'offer_lon', 'offer_acc_id', 'offer_capacity', 'event_id']
Final_df = Final_df.append(df,ignore_index=True)
if len(Final_df) > 0:
app_log.info('Got '+str(len(Final_df))+ ' offers')
else:
app_log.warning("got 0 offers for the events\n Retrying in %s Secs"%DB_READ_INTERVAL)
if Zero_time == False:
Zero_time = time()
else:
if time() - Zero_time > ZERO_LIMIT:
Send_mail()
Zero_time = False
sleep(DB_READ_INTERVAL)
return False
except psycopg2.OperationalError as e:
if "violate" in str(e):
app_log.critical(e)
Send_mail()
sleep(DB_READ_INTERVAL)
continue
app_log.error("Unable to connect to get offer_to_event")
app_log.info("Retrying in %s Secs"%DB_READ_INTERVAL)
app_log.debug(str(e))
if Zero_time == False:
Zero_time = time()
else:
if time() - Zero_time > UNABLE_LIMIT:
Send_mail()
Zero_time = False
sleep(DB_READ_INTERVAL)
return False
else:
app_log.info('Time reading offers%i'%int(time()-reading))
app_log.info("Done reading offers going to events")
return Final_df
thats what splice_arr does, and we does that so as not to bog down the db.
Assuming REQUEST_LIMIT is 2 and we have 5 events ['a','b','c','d','e'] then splice_arr will return [['a','b'], ['c','d'],['e']].
Next we iterate over that array to create the query statement, if we take the first one ['a', 'b'] our statement will be "SELECT lat, lon, id, capacity, event_id FROM offer WHERE event_id = 'a' OR event_id = 'b' OR" then we execute statement = statement[:-3] which removes the last unnecessary part in the end of the statement OR .
Then we query for that statement and save the result into Final_df
Next we query for the requests using the events of the offers, as querying for all the events is useless, we only need events which have offers.
def read_requests(Events):
app_log.info("reading requests going to events")
Zero_time = False
reading = time()
while True:
try:
Final_df = pd.DataFrame({'request_lat': [],
'request_lon': [],
'request_acc_id': [],
'request_trip_id': [],
'event_id': []})
spliced_arr = splice_arr(Events)
for arr in spliced_arr:
statement = 'SELECT lat, lon, id, trip_id, event_id FROM request WHERE'
for request in arr:
statement += " event_id = '" + request + "' OR"
statement = statement[:-3]
with psycopg2.connect(host=HOSTNAME,
user=USERNAME,
password=PASSWORD,
dbname=DATABASE) as conn:
with conn.cursor(name='my_cursor', cursor_factory=psycopg2.extras.DictCursor) as cur:
cur.execute(statement)
df = pd.DataFrame(np.array(cur.fetchall()))
if len(df) > 0:
df.columns = ['request_lat', 'request_lon', 'request_acc_id', 'request_trip_id', 'event_id']
Final_df = Final_df.append(df,ignore_index=True)
if len(Final_df) > 0:
app_log.info('Got '+str(len(Final_df))+ ' requests')
Zero_time = False
else:
app_log.warning("got 0 requests for the events\n Retrying in %s Secs"%DB_READ_INTERVAL)
if Zero_time == False:
Zero_time = time()
else:
if time() - Zero_time > ZERO_LIMIT:
Send_mail()
Zero_time = False
sleep(DB_READ_INTERVAL)
return False
except psycopg2.OperationalError as e:
if "violate" in str(e):
app_log.critical(e)
Send_mail()
sleep(DB_READ_INTERVAL)
continue
app_log.error("Unable to connect to get request_to_event")
app_log.info("Retrying in %s Secs"%DB_READ_INTERVAL)
app_log.debug(e)
if Zero_time == False:
Zero_time = time()
else:
if time() - Zero_time > ZERO_LIMIT:
Send_mail()
Zero_time = False
sleep(DB_READ_INTERVAL)
return False
else:
app_log.info('total request time %i'%int(time()-reading))
app_log.info("Done reading requests going to events")
return Final_df
app_log.info("reading requests going to events")
Zero_time = False
reading = time()
while True:
try:
Final_df = pd.DataFrame({'request_lat': [],
'request_lon': [],
'request_acc_id': [],
'request_trip_id': [],
'event_id': []})
spliced_arr = splice_arr(Events)
for arr in spliced_arr:
statement = 'SELECT lat, lon, id, trip_id, event_id FROM request WHERE'
for request in arr:
statement += " event_id = '" + request + "' OR"
statement = statement[:-3]
with psycopg2.connect(host=HOSTNAME,
user=USERNAME,
password=PASSWORD,
dbname=DATABASE) as conn:
with conn.cursor(name='my_cursor', cursor_factory=psycopg2.extras.DictCursor) as cur:
cur.execute(statement)
df = pd.DataFrame(np.array(cur.fetchall()))
if len(df) > 0:
df.columns = ['request_lat', 'request_lon', 'request_acc_id', 'request_trip_id', 'event_id']
Final_df = Final_df.append(df,ignore_index=True)
if len(Final_df) > 0:
app_log.info('Got '+str(len(Final_df))+ ' requests')
Zero_time = False
else:
app_log.warning("got 0 requests for the events\n Retrying in %s Secs"%DB_READ_INTERVAL)
if Zero_time == False:
Zero_time = time()
else:
if time() - Zero_time > ZERO_LIMIT:
Send_mail()
Zero_time = False
sleep(DB_READ_INTERVAL)
return False
except psycopg2.OperationalError as e:
if "violate" in str(e):
app_log.critical(e)
Send_mail()
sleep(DB_READ_INTERVAL)
continue
app_log.error("Unable to connect to get request_to_event")
app_log.info("Retrying in %s Secs"%DB_READ_INTERVAL)
app_log.debug(e)
if Zero_time == False:
Zero_time = time()
else:
if time() - Zero_time > ZERO_LIMIT:
Send_mail()
Zero_time = False
sleep(DB_READ_INTERVAL)
return False
else:
app_log.info('total request time %i'%int(time()-reading))
app_log.info("Done reading requests going to events")
return Final_df
Next we prepare the data that we will Feed into OR-tools in our next tutorial
Comments
Post a Comment