#!/usr/bin/python
# -*- coding: utf-8 -*-
"""command shell for mini-project-1"""
import cmd
import sqlite3
from getpass import getpass
from logging import getLogger
import pendulum
from mini_project_1.book_member import get_book_member_parser, book_member
from mini_project_1.cancel_booking import get_cancel_booking_parser
from mini_project_1.common import ShellArgumentException, \
MINI_PROJECT_DATE_FMT, get_location_id, ValueNotFoundException, \
get_selection, send_message, check_valid_email, check_valid_lcode
from mini_project_1.delete_request import get_delete_request_parser
from mini_project_1.list_bookings import get_list_bookings_parser
from mini_project_1.list_requests import get_list_ride_requests_parser
from mini_project_1.loginsession import LoginSession
from mini_project_1.logout import get_logout_parser
from mini_project_1.register import valid_password, \
register_member, valid_name, valid_phone, valid_email
from mini_project_1.offer_ride import get_offer_ride_parser, \
check_valid_cno, offer_ride
from mini_project_1.post_request import get_post_request_parser, \
valid_location_code
from mini_project_1.search_requests import \
get_search_requests_city_parser, \
get_search_requests_lcode_parser, print_5_and_prompt
from mini_project_1.search_rides import get_search_for_ride_parser
from mini_project_1.select_request import get_select_request_parser
from mini_project_1.show_inbox import get_show_inbox_parser
__log__ = getLogger(__name__)
[docs]def logged_in(f):
"""Annotation to check if someone is logged in before attempting a
command in the :class:`.MainProjectShell`"""
def wrapper(*args):
if args[0].login_session:
return f(*args)
else:
__log__.error("you must be logged in to use this function")
return wrapper
[docs]class MiniProjectShell(cmd.Cmd):
"""Main shell for mini-project-1"""
intro = \
"Welcome to mini-project-1 shell. Type help or ? to list commands\n"
prompt = "mini-project-1>"
login_session: LoginSession = None
def __init__(self, database: sqlite3.Connection,
register_start: bool = False):
"""Initialize the mini-project-1 shell
:param database: :class:`sqlite3.Connection` to the database to
interact with the mini-project-1 shell
"""
super().__init__()
self.database = database
self.register_start = register_start
[docs] def cmdloop(self, intro=None):
# start a login command at start.
if self.register_start:
self.do_register(None)
self.do_login(None)
self.do_show_inbox(None)
super().cmdloop()
# ===============================
# Shell command definitions
# ===============================
[docs] def do_login(self, arg):
"""Login to the mini-project-1 database"""
if self.login_session:
__log__.error("already logged in")
else:
print("Login to mini-project-1 database:")
username = str(input("username: "))
password = getpass("password: ")
self.login(username, password)
if not self.login_session:
self.do_login(None)
[docs] @logged_in
def do_logout(self, arg):
"""Logout from the mini-project-1 database"""
parser = get_logout_parser()
try:
parser.parse_args(arg.split())
self.logout()
except ShellArgumentException:
__log__.exception("invalid logout arguement")
[docs] @staticmethod
def help_logout():
"""Print the argparser help message for logout"""
get_logout_parser().print_help()
[docs] def do_exit(self, arg):
"""Logout (if needed) and exit out of the mini-project-1 shell"""
if self.login_session:
self.logout()
__log__.info("exiting mini-project-1 shell")
self.database.close()
return True
[docs] @logged_in
def do_show_inbox(self, arg):
"""View all unseen (seen="n") inbox messages related to the
currently logged in email
Set all viewed messages as seen="y"
"""
parser = get_show_inbox_parser()
try:
# view all messages within your inbox
inbox_items = self.database.execute(
"SELECT DISTINCT email, msgTimestamp, sender, content, rno, seen "
"FROM inbox "
"WHERE inbox.email = ? AND inbox.seen = 'n'",
(self.login_session.get_email(),)
).fetchall()
if inbox_items:
print("Your inbox:")
for inbox_item in inbox_items:
print(inbox_item)
# set all messages within your inbox as seen="y"
self.database.execute(
"UPDATE inbox "
"SET seen='y' "
"WHERE inbox.email = ?",
(self.login_session.get_email(),)
)
self.database.commit()
else:
print("No new messages")
except ShellArgumentException:
__log__.exception("invalid show_inbox argument")
[docs] @staticmethod
def help_show_inbox():
"""Print the argparser help message for show_inbox"""
get_show_inbox_parser().print_help()
[docs] @logged_in
def do_offer_ride(self, arg):
"""Offer a ride"""
dbcursor = self.database.cursor()
parser = get_offer_ride_parser()
try:
args = parser.parse_args(arg.split())
try:
source = \
get_location_id(dbcursor, args.src,
"Choose a source: ")
destination = \
get_location_id(dbcursor, args.dst,
"Choose a destination: ")
except ValueNotFoundException as e:
print(e)
raise ShellArgumentException
enroute = set()
for place in args.enroute:
try:
enroute.add(get_location_id(dbcursor, place, "Which place did you want to add? "))
except ValueNotFoundException as e:
print(e)
if not check_valid_cno(dbcursor, args.cno, self.login_session):
args.cno = None
if offer_ride(self.database, self.login_session, args.date.to_datetime_string(), args.seats, args.price,
args.luggage, source, destination, args.cno, enroute):
print("Added ride")
else:
print("Could not add ride")
except ShellArgumentException:
__log__.error("invalid offer_ride argument")
[docs] @staticmethod
def help_offer_ride():
"""Print the argparser help message for offer_ride"""
get_offer_ride_parser().print_help()
[docs] @logged_in
def do_search_rides(self, arg):
"""Search for ride"""
dbcursor = self.database.cursor()
parser = get_search_for_ride_parser()
try:
args = parser.parse_args(arg.split())
search_conditions = "(l1.lcode LIKE ? OR l1.city LIKE ? OR l1.prov LIKE ? OR l1.address LIKE ? OR " + \
"l2.lcode LIKE ? OR l2.city LIKE ? OR l2.prov LIKE ? OR l2.address LIKE ? OR " + \
"l3.lcode LIKE ? OR l3.city LIKE ? OR l3.prov LIKE ? OR l3.address LIKE ?)"
# setup query conditions for one term
search_string = search_conditions
search_vars = (args.term1, '%'+args.term1+'%', '%'+args.term1+'%', '%'+args.term1+'%',
args.term1, '%'+args.term1+'%', '%'+args.term1+'%', '%'+args.term1+'%',
args.term1, '%'+args.term1+'%', '%'+args.term1+'%', '%'+args.term1+'%')
# add query conditions for the second term if supplied
if args.term2:
search_string = search_string + " AND " + search_conditions
more_search_vars = (args.term2, '%'+args.term2+'%', '%'+args.term2+'%', '%'+args.term2+'%',
args.term2, '%'+args.term2+'%', '%'+args.term2+'%', '%'+args.term2+'%',
args.term2, '%'+args.term2+'%', '%'+args.term2+'%', '%'+args.term2+'%')
search_vars = search_vars + more_search_vars
# add query conditions for the third term if supplied
if args.term3:
search_string = search_string + " AND " + search_conditions
more_search_vars = (args.term3, '%'+args.term3+'%', '%'+args.term3+'%', '%'+args.term3+'%',
args.term3, '%'+args.term3+'%', '%'+args.term3+'%', '%'+args.term3+'%',
args.term3, '%'+args.term3+'%', '%'+args.term3+'%', '%'+args.term3+'%')
search_vars = search_vars + more_search_vars
query = "SELECT r.* " + \
"FROM locations l1, locations l2, rides r " + \
"LEFT JOIN enroute en on en.rno = r.rno " + \
"LEFT JOIN locations l3 on en.lcode = l3.lcode " + \
"WHERE l1.lcode = r.src AND l2.lcode = r.dst AND "+search_string+";"
dbcursor.execute(query, search_vars)
results = dbcursor.fetchall()
# display matching
if len(results):
selection = get_selection(results)
# message the posting ride member
if selection:
send_message(
self.database,
selection[7],
self.login_session.get_email(),
"I want to book seats on this ride",
selection[0]
)
print("Message sent to driver")
else:
print("No results")
except ShellArgumentException:
__log__.error("invalid search_rides argument")
[docs] @staticmethod
def help_search_rides():
"""Print the argparser help message for search_rides"""
get_search_for_ride_parser().print_help()
[docs] @logged_in
def do_list_bookings(self, arg):
"""List all the bookings that the user offers"""
parser = get_list_bookings_parser()
try:
parser.parse_args(arg.split())
cur = self.database.cursor()
cur.execute(
'SELECT DISTINCT bookings.* '
'FROM bookings, rides '
'WHERE rides.driver = ? '
'AND rides.rno = bookings.rno;',
(self.login_session.get_email(),)
)
rows = cur.fetchall()
for row in rows:
print(row)
except ShellArgumentException:
__log__.exception("invalid list_bookings argument")
[docs] @staticmethod
def help_list_bookings():
"""Print the argparser help message for list_bookings"""
get_list_bookings_parser().print_help()
[docs] @logged_in
def do_book_member(self, arg):
"""Book other members on a ride"""
parser = get_book_member_parser()
try:
args = parser.parse_args(arg.split())
# ensure valid inputs
if not check_valid_lcode(self.database, args.pickup):
print("Pickup locde not valid")
raise ShellArgumentException
if not check_valid_lcode(self.database, args.dropoff):
print("Dropoff locde not valid")
raise ShellArgumentException
if not check_valid_email(self.database, args.email):
print("Email not valid")
raise ShellArgumentException
# list my rides
cur = self.database.cursor()
cur.execute(
'SELECT * '
'FROM rides '
'WHERE rides.driver = ?;',
(self.login_session.get_email(),)
)
rows = cur.fetchall()
ride = get_selection(rows, "Enter the row number for the ride: ")
rno = ride[0]
seats_available = ride[3]
# check how many seats booked
cur.execute(
'SELECT sum(seats) '
'FROM bookings '
'WHERE bookings.rno = ?;',
(rno,)
)
seats_taken = cur.fetchone()
if not seats_taken:
seats_taken = 0
else:
seats_taken = seats_taken[0]
# book seats if available or user accepts overbooking
if seats_available < seats_taken + args.seats:
if str(input("Warning: ride will be overbooked. Continue: [y] or [n]") == 'y'):
book_member(self.database, rno, args.email, args.seats, args.price, args.pickup, args.dropoff)
send_message(self.database, args.email, self.login_session.get_email(),
"I have booked you on a ride", rno)
else:
book_member(self.database, rno, args.email, args.seats, args.price, args.pickup, args.dropoff)
send_message(self.database, args.email, self.login_session.get_email(),
"I have booked you on a ride", rno)
except ShellArgumentException:
__log__.error("invalid book_member argument")
[docs] @staticmethod
def help_book_member():
"""Print the argparser help message for book_member"""
get_search_for_ride_parser().print_help()
[docs] @logged_in
def do_cancel_booking(self, arg):
"""Cancel a booking"""
cur = self.database.cursor()
parser = get_cancel_booking_parser()
try:
args = parser.parse_args(arg.split())
cur.execute(
"SELECT bookings.* "
"FROM bookings, rides "
"WHERE bookings.bno = ? "
"AND rides.driver = ? "
"AND bookings.rno = rides.rno",
(args.bno, self.login_session.get_email(),)
)
to_delete = cur.fetchone()
if not to_delete:
print("You don't have a booking where bno={}".format(args.bno))
print("Your bookings:")
self.do_list_bookings("")
return
cur.execute(
"DELETE FROM bookings "
"WHERE EXISTS("
"SELECT * "
"FROM bookings b2, rides "
"WHERE b2.bno = ?"
"AND bookings.bno = b2.bno "
"AND rides.driver = ?"
"AND b2.rno = rides.rno)",
(args.bno, self.login_session.get_email(),)
)
self.database.commit()
print("Successfully deleted:\n{}".format(to_delete))
cur.execute(
"INSERT INTO inbox VALUES (?, ?, ?, ?, ?, ?);",
(to_delete[1], pendulum.now().to_datetime_string(),
self.login_session.get_email(),
"Your booking has been cancelled.",
to_delete[2], "n")
)
self.database.commit()
print("Successfully sent cancellation message to {}."
.format(to_delete[1]))
except ShellArgumentException:
__log__.exception("invalid cancel_booking argument")
[docs] @staticmethod
def help_cancel_booking():
"""Print the argparser help message for cancel_booking"""
get_cancel_booking_parser().print_help()
[docs] @logged_in
def do_post_request(self, arg):
"""Post a ride request"""
parser = get_post_request_parser()
try:
args = parser.parse_args(arg.split())
# generate a new rid
max_rid = self.database.execute(
"select max(r.rid) from requests r").fetchone()[0]
if not max_rid:
max_rid = 0
rid = 1 + int(max_rid)
# validate the given location codes
if not valid_location_code(self.database, args.pickup):
raise ShellArgumentException(
"invalid location code: {}".format(args.pickup))
if not valid_location_code(self.database, args.dropoff):
raise ShellArgumentException(
"invalid location code: {}".format(args.dropoff))
# create and insert the new ride request
self.database.execute(
"INSERT INTO requests VALUES (?, ?, ?, ?, ?, ?)",
(rid, self.login_session.get_email(),
args.date.strftime(MINI_PROJECT_DATE_FMT),
args.pickup, args.dropoff, args.price)
)
self.database.commit()
except ShellArgumentException:
__log__.exception("invalid post_ride_request argument")
else:
__log__.info(
"successfully posted ride request: "
"rid: {} email: {} date: {} pickup: {} dropoff: {} price: {}"
"".format(
rid, self.login_session.get_email(),
args.date.strftime(MINI_PROJECT_DATE_FMT),
args.pickup, args.dropoff, args.price
)
)
[docs] @staticmethod
def help_post_request():
"""Print the argparser help message for post_request"""
get_post_request_parser().print_help()
[docs] @logged_in
def do_list_requests(self, arg):
"""List all the user's ride requests"""
parser = get_list_ride_requests_parser()
try:
parser.parse_args(arg.split())
cur = self.database.cursor()
cur.execute(
'SELECT DISTINCT * '
'FROM requests '
'WHERE email = ?',
(self.login_session.get_email().lower(),)
)
rows = cur.fetchall()
for row in rows:
print(row)
except ShellArgumentException:
__log__.exception("invalid list_requests argument")
[docs] @staticmethod
def help_list_requests():
"""Print the argparser help message for list_requests"""
get_list_ride_requests_parser().print_help()
[docs] @logged_in
def do_search_requests_lcode(self, arg):
"""Search for a ride request by location number"""
cur = self.database.cursor()
parser = get_search_requests_lcode_parser()
try:
args = parser.parse_args(arg.split())
cur.execute(
'SELECT DISTINCT requests.* '
'FROM requests '
'WHERE pickup like ?',
(args.lcode,)
)
rows = cur.fetchall()
print_5_and_prompt(rows)
except ShellArgumentException:
__log__.exception("invalid argument")
[docs] @staticmethod
def help_search_requests_lcode():
"""Print the argparser help message for searching ride requests by
location code"""
get_search_requests_lcode_parser().print_help()
[docs] @logged_in
def do_search_requests_city(self, arg):
"""Search for a ride quest by city name"""
cur = self.database.cursor()
parser = get_search_requests_city_parser()
try:
args = parser.parse_args(arg.split())
cur.execute(
'SELECT DISTINCT requests.* '
'FROM requests, locations '
'WHERE requests.pickup = locations.lcode '
'AND locations.city LIKE ?',
(args.city.lower(),)
)
rows = cur.fetchall()
print_5_and_prompt(rows)
except ShellArgumentException:
__log__.exception("invalid argument")
[docs] @staticmethod
def help_search_requests_city():
"""Print the argparser help message for searching ride requests
by city name"""
get_search_requests_city_parser().print_help()
[docs] @logged_in
def do_delete_request(self, arg):
"""Delete a ride request"""
cur = self.database.cursor()
parser = get_delete_request_parser()
try:
args = parser.parse_args(arg.split())
cur.execute(
"SELECT DISTINCT * "
"FROM requests "
"WHERE rid = ? AND email = ?",
(args.rid, self.login_session.get_email(),)
)
to_delete = cur.fetchall()
if len(to_delete) == 0:
print("You don't have a ride request where rid={}"
.format(args.rid))
print("Your requests:")
self.do_list_requests("")
return
cur.execute(
"DELETE "
"FROM requests "
"WHERE rid = ? AND email = ?",
(args.rid, self.login_session.get_email(),)
)
self.database.commit()
print("Successfully deleted:\n{}".format(to_delete))
except ShellArgumentException:
__log__.exception("invalid argument")
[docs] @staticmethod
def help_delete_request():
"""Print the argparser help message for deleting a ride request"""
get_delete_request_parser().print_help()
[docs] @logged_in
def do_select_request(self, arg): # TODO make testable
"""Select a ride request and perform actions"""
cur = self.database.cursor()
parser = get_select_request_parser()
try:
args = parser.parse_args(arg.split())
cur.execute(
"SELECT * "
"FROM requests "
"WHERE rid = ?",
(args.rid,)
)
selected = cur.fetchone()
if selected is None:
print("There is no ride request with rid={}".format(args.rid))
return
print("You have selected: {}".format(selected))
while True:
response = \
input("Would you like to message the poster? [y|n]\n")
if response == "y":
message = input("Your message: ")
cur.execute(
"SELECT email "
"FROM requests "
"WHERE rid = ?",
(args.rid,)
)
poster = cur.fetchone()[0]
cur.execute(
"INSERT INTO inbox VALUES (?, ?, ?, ?, ?, ?);",
(poster,
pendulum.now().to_datetime_string(),
self.login_session.get_email(),
message,
None,
"n")
)
self.database.commit()
print("Successfully sent message to {}".format(poster))
break
elif response == "n":
break
except ShellArgumentException:
__log__.error("invalid argument")
[docs] @staticmethod
def help_select_request():
"""Print the argparser help message for selecting a ride request"""
get_select_request_parser().print_help()
[docs] def do_register(self, arg):
"""Register a new member to the mini-project-1 database"""
# get a valid email
print("Starting member registration wizard:")
while True:
email_str = input("email: ")
if valid_email(self.database, email_str):
email_str = valid_email(self.database, email_str)
break
# get valid name
while True:
name_str = input("name: ")
if valid_name(name_str):
break
# get valid phone
while True:
phone_str = input("phone: ")
if valid_phone(phone_str):
phone_str = valid_phone(phone_str)
break
# get valid password
while True:
password1 = getpass("password: ")
if not valid_password(password1):
continue
password2 = getpass("validate password: ")
if password1 != password2:
print("passwords do not match")
continue
else:
break
# finally register the new user
register_member(
self.database, email_str, name_str, phone_str, password1)
# ===============================
# Shell functionality definitions
# ===============================
[docs] @logged_in
def logout(self):
"""Logout method
Set the shell's ``login_session`` to :obj:`None`.
"""
email = self.login_session.get_email()
self.login_session = None
__log__.info("logged out user: {}".format(email))
[docs] def login(self, email: str, password: str):
"""Login method
Check if a :class:`LoginSession` already exists for the shell if not
attempt to login with the given email and password.
If the login attempt is successful set the shell's ``login_session``
to the newly created :class:`.loginsession.LoginSession`.
"""
if self.login_session:
__log__.error("already logged in as user: {}".format(
self.login_session.get_email()))
else:
user_hit = self.database.execute(
"SELECT email, pwd "
"FROM members "
"WHERE email = ? AND pwd = ?",
(email.lower(), password)
).fetchone()
if user_hit:
self.login_session = LoginSession(user_hit[0], user_hit[1])
__log__.info("logged in user: {}".format(user_hit[0]))
else:
__log__.warning("invalid login: bad username/password")