#!/usr/bin/env python # coding=utf-8 #Author:yang import os,sys BASE_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) sys.path.append(BASE_DIR) from Atm.core import main if __name__ == ‘__main__‘: main.admin_run()
atm.bin.admin
#!/usr/bin/env python # coding=utf-8 #Author:yang import os,sys BASE_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) sys.path.append(BASE_DIR) from Atm.core import main if __name__ == ‘__main__‘: main.run()
atm.bin.user
#!/usr/bin/env python # coding=utf-8 #Author:yang import os,sys import logging BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) DATABASE = { ‘engine‘: ‘file_storage‘, # support mysql,postgresql in the future ‘name‘: ‘accounts‘, ‘path‘: "%s\\db" % BASE_DIR } LOG_LEVEL = logging.INFO LOG_TYPES = { ‘transaction‘: ‘transactions.log‘, ‘access‘: ‘access.log‘, } LOG_DATABASE = { ‘engine‘: ‘file_storage‘, # support mysql,postgresql in the future ‘name‘: ‘accounts‘, ‘path‘: "%s/log" % BASE_DIR } TRANSACTION_TYPE = { ‘repay‘: {‘action‘: ‘plus‘, ‘interest‘: 0}, # 还款 ‘receive‘: {‘action‘: ‘plus‘, ‘interest‘: 0}, # 接收 ‘withdraw‘: {‘action‘: ‘minus‘, ‘interest‘: 0.05}, # 提款 ‘transfer‘: {‘action‘: ‘minus‘, ‘interest‘: 0.05}, # 转出 ‘pay‘: {‘action‘: ‘minus‘, ‘interest‘: 0}, # 支付 ‘save‘: {‘action‘: ‘plus‘, ‘interest‘: 0}, # 存钱 }
atm.conf.setting
#!/usr/bin/env python # coding=utf-8 #Author:yang from Atm.conf import setting from Atm.core import accounts from Atm.core import logger def make_transaction(log_obj,account_data ,type,amount,**others): #判断交易类型 amount = float(amount) old_balance = account_data[‘balance‘] if type in setting.TRANSACTION_TYPE: interest = amount * setting.TRANSACTION_TYPE[type][‘interest‘] if setting.TRANSACTION_TYPE[type][‘action‘] == ‘plus‘: new_balance = old_balance + amount + interest elif setting.TRANSACTION_TYPE[type][‘action‘] == ‘minus‘: new_balance = old_balance - amount - interest if new_balance < 0: print(‘您的信用卡额度不足!当前余额为%s‘%old_balance) return account_data[‘balance‘] = new_balance accounts.dump_account(account_data) log_obj.info("account:%s action:%s amount:%s interest:%s" % (account_data[‘id‘], type, amount, interest)) return account_data
atm.core.transaction
#!/usr/bin/env python # coding=utf-8 #Author:yang import os,time,datetime,json from Atm.core import auth from Atm.core import logger from Atm.core import transaction from Atm.core import accounts #交易日志 trans_logger = logger.logger(‘transaction‘) #登录日志 access_logger = logger.logger(‘access‘) #临时的账户数据 user_data = {‘auth_id‘:None, ‘auth_status‘:False, ‘auth_data‘:None } def account_info(user_data): #查看帐户信息 print(‘\033[32;1m您的帐户信息为:\033[0m‘) for k in user_data[‘auth_data‘]: if k == ‘password‘: continue else: print(‘\033[32;1m{}:{}\033[0m‘.format(k,user_data[‘auth_data‘].get(k))) def repay(user_data): #还款 account_data = user_data[‘auth_data‘] cost = account_data[‘credit‘] - account_data[‘balance‘] balance_message = ‘‘‘--------当前帐户财产情况--------\n额度:%s\n剩余额度:%s\n当前应还:%s ‘‘‘ % (account_data[‘credit‘], account_data[‘balance‘], cost) print(balance_message) tsg = False while not tsg: amount = input(‘请输入您的还款金额(按"b"返回):‘) if amount.isdigit(): new_balance = transaction.make_transaction(trans_logger,account_data ,‘repay‘,amount) time.sleep(0.1) # 处理显示问题 if new_balance: print(‘‘‘\033[42;1m现在账户余额为%s\033[0m‘‘‘ % (new_balance[‘balance‘])) elif amount == ‘b‘: tsg = True else: print(‘请输入正确的还款金额(数字)!‘) def withdraw(user_data): #取款 account_data = user_data[‘auth_data‘] cost = account_data[‘credit‘] - account_data[‘balance‘] balance_message = ‘‘‘--------当前帐户财产情况--------\n额度:%s\n剩余额度:%s\n当前应还:%s ‘‘‘ %(account_data[‘credit‘],account_data[‘balance‘],cost) print(balance_message) tsg = False while not tsg: amount = input(‘请输入您的取款金额(按"b"返回):‘) if amount.isdigit(): new_balance = transaction.make_transaction(trans_logger,account_data ,‘withdraw‘,amount) time.sleep(0.1) # 处理显示问题 if new_balance: print(‘‘‘\033[42;1m现在账户余额为%s\033[0m‘‘‘ % (new_balance[‘balance‘])) elif amount == ‘b‘: tsg = True else: print(‘请输入正确的取款金额(数字)!‘) def transfer(user_data): #转账 account_data = user_data[‘auth_data‘] cost = account_data[‘credit‘] - account_data[‘balance‘] balance_message = ‘‘‘--------当前帐户财产情况--------\n额度:%s\n剩余额度:%s\n当前应还:%s ‘‘‘ % (account_data[‘credit‘], account_data[‘balance‘], cost) print(balance_message) tsg = False while not tsg: receiver = input(‘请输入被转账人id(按"b"返回):‘) if receiver == user_data["auth_id"]: # 判断帐号是否为自己 print("\033[31;1m用户为自己\033[0m") continue elif receiver == "b": break else: receiver_account_data = auth.acc_check(receiver) if receiver_account_data == False: print(‘用户不存在!‘) continue else: status = receiver_account_data[‘status‘] if status == 0: amount = input(‘请输入您的转账金额(按"b"返回):‘) if amount.isdigit(): new_balance = transaction.make_transaction(trans_logger,account_data ,‘transfer‘,amount) transaction.make_transaction(trans_logger, receiver_account_data, ‘receive‘,amount ) time.sleep(0.1) # 处理显示问题 if new_balance: print(‘‘‘转账成功,\033[42;1m现在账户余额为%s\033[0m‘‘‘ % (new_balance[‘balance‘])) elif amount == ‘b‘: tsg = True else: print(‘请输入正确的金额(数字)!‘) def save(user_data): #存款 account_data = user_data[‘auth_data‘] cost = account_data[‘credit‘] - account_data[‘balance‘] balance_message = ‘‘‘--------当前帐户财产情况--------\n额度:%s\n剩余额度:%s\n当前应还:%s ‘‘‘ % (account_data[‘credit‘], account_data[‘balance‘], cost) print(balance_message) tsg = False while not tsg: amount = input(‘请输入您的存款金额(按"b"返回):‘) if amount.isdigit(): new_balance = transaction.make_transaction(trans_logger,account_data ,‘save‘,amount) time.sleep(0.1) # 处理显示问题 if new_balance: print(‘‘‘\033[42;1m现在账户余额为%s\033[0m‘‘‘ % (new_balance[‘balance‘])) elif amount == ‘b‘: tsg = True else: print(‘请输入正确的存款金额(数字)!‘) def logout(user_data): #用户登出 exit(‘用户%s退出成功!‘%user_data[‘auth_id‘]) def sign_up(user_data): #用户注册界面 print(‘----------欢迎来到用户注册界面!---------‘) auth.acc_sign_up() def lock(user_data): #锁定或者解锁用户 while True: acc_id = input(‘请输入您要修改的用户id(按“b”返回):‘) if acc_id == ‘b‘: print(‘返回成功!‘) break elif accounts.exist_account(acc_id): acc_data = accounts.load_account(acc_id) now = datetime.datetime.now() times = datetime.timedelta(days=3650) exp_time_stamp = time.mktime(time.strptime(acc_data[‘expire_date‘], ‘%Y-%m-%d‘)) if time.time() > exp_time_stamp: lock_type = input(‘此帐户已经锁定!是否解锁(Y/N):‘) if lock_type == ‘Y‘: acc_data[‘expire_date‘] = (now + times).strftime(‘%Y-%m-%d‘) accounts.dump_account(acc_data) print(‘解锁成功!‘) else: print(‘未进行任何操作‘) if time.time() < exp_time_stamp: lock_type = input(‘此帐户正常!是否锁定(Y/N):‘) if lock_type == ‘Y‘: acc_data[‘expire_date‘] = (now - times).strftime(‘%Y-%m-%d‘) accounts.dump_account(acc_data) print(‘锁定成功!‘) else: print(‘未进行任何操作‘) else: print(‘用户不存在!‘) def pay_log(user_data): #打印消费流水 while True: base_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) auth_id = user_data[‘auth_id‘] file = ‘%s\\shopping\\log\\%s_shopping.log‘ % (base_dir, auth_id) if os.path.exists(file): with open(file,‘r‘) as f: print(‘该用户流水账单如下:‘) print(‘------------------------------‘) print(f.read()) print(‘------------------------------‘) else: print(‘该用户没有消费记录!‘) choise = input(‘按“b”返回:‘) if choise == ‘q‘: break def improve(user_data): #调整额度 while True: acc_id = input(‘请输入您要调整额度的用户id(按“b”返回):‘) if acc_id == ‘b‘: print(‘返回成功!‘) break elif accounts.exist_account(acc_id): acc_data = accounts.load_account(acc_id) credit = input(‘请输入需要调整的额度:‘) if credit.isdigit(): acc_data[‘credit‘] = int(credit) accounts.dump_account(acc_data) print(‘修改成功!‘) else: print(‘请输入数字!‘) else: print(‘用户不存在!‘) @auth.login_auth #装饰器,验证是否为普通用户 def interactive(user_data): #普通用户菜单 menu = u‘‘‘ ------- ATM---------\033[32;1m 1. 账户信息 2. 还款 3. 取款 4. 转账 5. 存款 6. 消费流水 7. 退出 \033[0m‘‘‘ menu_dic = { ‘1‘: account_info, ‘2‘: repay, ‘3‘: withdraw, ‘4‘: transfer, ‘5‘: save, ‘7‘: logout, ‘6‘: pay_log } exit_flag = False while not exit_flag: print(menu) user_choise = input(‘>>:‘).strip() if user_choise in menu_dic: menu_dic[user_choise](user_data) else: print("选择不存在") @auth.login_auth_admin #装饰器,验证是否为管理员用户 def admin_interactive(user_data): #管理员菜单 menu = u‘‘‘ ------- 管理员模式---------\033[32;1m 1. 账户信息 2. 注册用户 3. 解锁以及锁定用户 4. 调整额度 5. 退出 \033[0m‘‘‘ menu_dic = { ‘1‘: account_info, ‘2‘: sign_up, ‘3‘: lock, ‘4‘: improve, ‘5‘: logout, } exit_flag = False while not exit_flag: print(menu) user_choise = input(‘>>:‘).strip() if user_choise in menu_dic: menu_dic[user_choise](user_data) else: print("选择不存在") def get_user_data(): #进行用户认证 account_data = auth.acc_login(user_data,access_logger) if user_data[‘auth_status‘]: user_data[‘auth_data‘] = account_data return user_data else: return None def run(): #定义接口函数 print(‘欢迎来到ATM!‘) user_data = get_user_data() interactive(user_data) def admin_run(): #定义管理员接口函数 print(‘欢迎管理员登录ATM!‘) user_data = get_user_data() admin_interactive(user_data)
atm.core.main
#!/usr/bin/env python # coding=utf-8 #Author:yang import logging from Atm.conf import setting def logger(log_type): # create logger logger = logging.getLogger(log_type) logger.setLevel(setting.LOG_LEVEL) # create console handler and set level to debug ch = logging.StreamHandler() ch.setLevel(setting.LOG_LEVEL) # create file handler and set level to warning log_file = "%s\log\%s" % (setting.BASE_DIR, setting.LOG_TYPES[log_type]) fh = logging.FileHandler(log_file) fh.setLevel(setting.LOG_LEVEL) # create formatter formatter = logging.Formatter(‘%(asctime)s - %(name)s - %(levelname)s - %(message)s‘) # add formatter to ch and fh ch.setFormatter(formatter) fh.setFormatter(formatter) # add ch and fh to logger logger.addHandler(ch) logger.addHandler(fh) return logger
atm.core.logger
#!/usr/bin/env python # coding=utf-8 #Author:yang import os,json from Atm.conf import setting def file_execute(sql,**kwargs): #对用户进行认证,看用户名是否存在在文件中 conn_params = setting.DATABASE db_path = ‘%s\\%s‘ % (conn_params[‘path‘],conn_params[‘name‘]) sql = sql.split(‘where‘) if sql[0].strip().startswith(‘select‘) and len(sql) > 1: column,val = sql[1].strip().split(‘=‘) if column == ‘username‘: account_file = ‘%s\\%s.json‘ %(db_path,val) if os.path.isfile(account_file): with open(account_file,‘r‘) as f: account_data = json.load(f) return account_data else: exit(‘用户不存在‘) def file_db_handle(conn_params): return file_execute def db_handler(): #连接数据 conn_params = setting.DATABASE if conn_params[‘engine‘] == ‘file_storage‘: return file_db_handle(conn_params)
atm.core.dbhandler
#!/usr/bin/env python # coding=utf-8 #Author:yang import json,time,os from Atm.core import db_handler from Atm.conf import setting from Atm.core import accounts def login_auth(func): #判断是否为普通用户 def wapper(user_data): if user_data[‘auth_data‘][‘status‘] == 0: func(user_data) else: print(‘您是管理员用户,请重新登录!‘) return wapper def login_auth_admin(func): #判断是否为管理员用户 def wapper(user_data): if user_data[‘auth_data‘][‘status‘] == 1: func(user_data) else: print(‘%s您好,您不是管理员用户请重新登录!‘%user_data[‘auth_id‘]) return wapper def acc_count(username,password): #验证登录 db_api = db_handler.db_handler() data = db_api(‘select * from accounts where username=%s‘%username) if data[‘password‘] == password: exp_time_stamp = time.mktime(time.strptime(data[‘expire_date‘],‘%Y-%m-%d‘)) if time.time() > exp_time_stamp: print(‘账户过期‘) else: return data else: print(‘用户名密码错误,请重新输入!‘) def acc_check(id): #检查用户信息是否存在,存在返回数据,不存在返回False conn_params = setting.DATABASE db_path = ‘%s\\%s‘ % (conn_params[‘path‘], conn_params[‘name‘]) account_file = ‘%s\\%s.json‘ % (db_path, id) if os.path.isfile(account_file): with open(account_file, ‘r‘) as f: account_data = json.load(f) return account_data else: return False def acc_login(user_data,log_obj): #定义acc_Login函数,判断用户是否登录过 retry_count =0 while user_data[‘auth_status‘] is not True and retry_count <3 : username = input(‘请输入用户名:‘).strip() password = input(‘请输入密码:‘).strip() acc_data = acc_count(username,password) if acc_data: user_data[‘auth_status‘] = True user_data[‘auth_id‘] = username print(‘登录成功!‘) log_obj.info("account [%s] login success" % username) return acc_data retry_count +=1 else: log_obj.error("account [%s] too many login attempts" % username) exit() def acc_sign_up(): retry_count = 0 tag = False while not tag: username = input(‘请输入用户名(按b返回):‘).strip() if accounts.exist_account(username): print(‘用户已存在‘) elif username == ‘b‘: break elif username != ‘‘: password = input(‘请输入密码:‘).strip() account_data = {"enroll_date": time.strftime("%Y-%m-%d"), "password": password, "id": username, "credit": 15000,"status": 0, "balance": 15000.0, "expire_date": "2021-01-01", "pay_day": 22} accounts.dump_account(account_data) print(‘用户%s创建成功!‘%username) tag = True else: print(‘请不要输入空值!‘)
atm.core.auth
#!/usr/bin/env python # coding=utf-8 #Author:yang import json,os from Atm.conf import setting def exist_account(id): #判断永辉是否存在 conn_params = setting.DATABASE db_path = ‘%s\\%s‘ % (conn_params[‘path‘], conn_params[‘name‘]) account_file = ‘%s\\%s.json‘ % (db_path, id) if os.path.exists(account_file): return True def load_account(id): #读取文件 conn_params = setting.DATABASE db_path = ‘%s\\%s‘ % (conn_params[‘path‘], conn_params[‘name‘]) account_file = ‘%s\\%s.json‘ % (db_path, id) with open(account_file,‘r‘) as f: acc_data = json.load(f) return acc_data def dump_account(account_data): #写入文件 conn_params = setting.DATABASE db_path = ‘%s\\%s‘ % (conn_params[‘path‘], conn_params[‘name‘]) account_file = ‘%s\\%s.json‘ % (db_path, account_data[‘id‘]) with open(account_file,‘w‘) as f: acc_data = json.dump(account_data, f) return True
atm.core.accounts
#!/usr/bin/env python # coding=utf-8 #Author:yang import os,sys from Atm.core import accounts def money(id): #用户存在,返回信用卡可用额度 if accounts.exist_account(id): money = accounts.load_account(id)[‘balance‘] return money else: return False def password(id): #用户存在返回用户密码 if accounts.exist_account(id): password = accounts.load_account(id)[‘password‘] return password else: return False def balance(id,money): #写入新额度 acc_data = accounts.load_account(id) acc_data[‘balance‘] = money accounts.dump_account(acc_data)
atm.pay_api.pay_api
#!/usr/bin/env python # coding=utf-8 #Author:yang import os,sys bash_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) sys.path.append(bash_dir) from shopping.core import main if __name__ == ‘__main__‘: main.run()
shopping.bin.shop
#!/usr/bin/env python # coding=utf-8 #Author:yang import os,logging BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) DATABASE = { ‘engine‘: ‘file_storage‘, ‘name‘: ‘accounts‘, ‘path‘: "%s\\db" % BASE_DIR } LOG_LEVEL = logging.INFO LOG_TYPES = { ‘shopping‘: ‘shopping.log‘ }
shopping.conf.setting
#!/usr/bin/env python # coding=utf-8 #Author:yang import json,os from shopping.conf import setting def exist_account(id): conn_params = setting.DATABASE db_path = ‘%s\\%s‘ % (conn_params[‘path‘], conn_params[‘name‘]) account_file = ‘%s\\%s.json‘ % (db_path, id) if os.path.exists(account_file): return True def load_account(id): conn_params = setting.DATABASE db_path = ‘%s\\%s‘ % (conn_params[‘path‘], conn_params[‘name‘]) account_file = ‘%s\\%s.json‘ % (db_path, id) with open(account_file,‘r‘) as f: acc_data = json.load(f) return acc_data def dump_account(account_data): conn_params = setting.DATABASE db_path = ‘%s\\%s‘ % (conn_params[‘path‘], conn_params[‘name‘]) account_file = ‘%s\\%s.json‘ % (db_path, account_data[‘user_id‘]) with open(account_file,‘w‘) as f: acc_data = json.dump(account_data, f) return True
shopping.core.shopping_accounts
#!/usr/bin/env python # coding=utf-8 #Author:yang from Atm.pay_api import pay_api from shopping.core import shopping_accouts from shopping.core import logger shopping_list = [] user_data = { ‘auth_id‘: None, ‘money‘: None, ‘auth_goods‘:None } def recory(): user_id = user_data[‘auth_id‘] if shopping_accouts.exist_account(user_id): acc_data = shopping_accouts.load_account(user_id) user_data[‘auth_goods‘] = acc_data[‘goods‘] ask = input(‘是否查询之前的消费记录?(Y/N):‘) if ask == ‘Y‘: print(‘您之前购买的商品为%s‘%user_data[‘auth_goods‘]) else: user_data[‘auth_goods‘] = [] ask_two = input(‘是否进入购物?(Y/N):‘) if ask_two == ‘Y‘: log_type = "shopping" shopping_logger = logger.logger(log_type, user_id) shopping(shopping_logger, user_data) else: exit(‘退出成功!‘) def shopping(log_obj,user_data): salary = user_data[‘money‘] product_list = { ‘1‘: [‘Iphone‘, 5800], ‘2‘: [‘Mac Pro‘, 9800], ‘3‘: [‘Bike‘, 800], ‘4‘: [‘Watch‘, 10600], ‘5‘: [‘Coffee‘, 31], ‘6‘: [‘Alex Python‘, 120], } while True: print(‘--------------------------------‘) for k, v in product_list.items(): # 打印货物清单 print(k, v) choise = input("请输入你要买的商品编号(按q返回登录界面):") if choise.isdigit(): # 将输入的数字转换为int类型 goods = product_list[choise][0] value = product_list[choise][1] choise = int(choise) if choise <= len(product_list) and choise > 0: # 当输入的数字在范围内 shopping_list.append((goods,value)) # 添加到购物车 if value <= salary: # 判断价格是否大于工资 salary -= value # 大于就减少工资 print(‘购买成功,您所购买的商品为%s,所剩下的余额为\033[51;1m%s\033[0m\n‘ % (goods, salary)) log_obj.info("account:%s action:%s product_number:%s goods:%s cost:%s" % (user_data[‘auth_id‘], "shopping", choise, goods, value)) else: print(‘\033[21;1m你的钱不够,请选择其他商品。\033[0m\n‘) else: print(‘请输入商品编号!(按q返回登录界面)‘) elif choise == ‘q‘: # 输入q打印商品名称,将清单添加到records文件中,返回登陆界面。 print(‘以下是您买的商品‘) for i in shopping_list: user_data[‘money‘] = salary user_data[‘auth_goods‘].append(i) print(i) shopping_data = {‘user_id‘: user_data[‘auth_id‘], ‘goods‘: user_data[‘auth_goods‘]} shopping_accouts.dump_account(shopping_data) pay_api.balance(user_data[‘auth_id‘],user_data[‘money‘]) break else: print(‘请输入商品编号!(按q返回登录界面)‘) def interactive(): #判断用户登录 while True: username = input(‘请输入用户名(按"q"退出):‘) if username == ‘q‘: print(‘退出成功!‘) break else: passwd = pay_api.password(username) if passwd == False: print(‘用户不存在,请协调管理员注册!‘) else: password = input(‘请输入密码:‘) if password == passwd: money = pay_api.money(username) user_data[‘auth_id‘] = username user_data[‘money‘] = money print(‘\033[32;1m您的信用卡可用额度为:%s\033[0m‘ % money) print(‘登录成功!‘) recory() else: print(‘密码不正确!‘) def run(): print(‘\033[32;1m欢迎来到购物商城\033[0m‘.center(30,‘-‘)) interactive()
shopping.core.main
#!/usr/bin/env python # coding=utf-8 #Author:yang import logging from shopping.conf import setting def logger(log_type, *user_name): # create logger logger = logging.getLogger(log_type) logger.setLevel(setting.LOG_LEVEL) # create console handler and set level to debug ch = logging.StreamHandler() ch.setLevel(setting.LOG_LEVEL) # create file handler and set level to warning if user_name: log_file = "%s/log/%s_%s" % (setting.BASE_DIR, user_name[0], setting.LOG_TYPES[log_type]) else: log_file = "%s/log/%s" % (setting.BASE_DIR, setting.LOG_TYPES[log_type]) fh = logging.FileHandler(log_file) fh.setLevel(setting.LOG_LEVEL) # create formatter formatter = logging.Formatter(‘%(asctime)s - %(name)s - %(levelname)s - %(message)s‘) # add formatter to ch and fh ch.setFormatter(formatter) fh.setFormatter(formatter) # add ch and fh to logger logger.addHandler(ch) logger.addHandler(fh) return logger
shopping.core.logger
时间: 2024-10-14 12:39:48