1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142
| from datetime import datetime from Crypto.PublicKey import RSA from Crypto.Signature import PKCS1_v1_5 from Crypto.Hash import SHA256 from urllib.parse import quote_plus from urllib.parse import urlparse, parse_qs from base64 import decodebytes, encodebytes import json import requests
class AliPay(object): """ 支付宝支付接口(PC端支付接口) """
def __init__(self, appid, app_notify_url, app_private_key_path, alipay_public_key_path, return_url, debug=False): self.appid = appid self.app_notify_url = app_notify_url self.app_private_key_path = app_private_key_path self.app_private_key = None self.return_url = return_url with open(self.app_private_key_path) as fp: self.app_private_key = RSA.importKey(fp.read()) self.alipay_public_key_path = alipay_public_key_path with open(self.alipay_public_key_path) as fp: self.alipay_public_key = RSA.importKey(fp.read()) if debug is True: self.__gateway = "https://openapi.alipaydev.com/gateway.do" else: self.__gateway = "https://openapi.alipay.com/gateway.do"
def direct_pay(self, subject, out_trade_no, total_amount, return_url=None, **kwargs): biz_content = { "subject": subject, "out_trade_no": out_trade_no, "total_amount": total_amount, "product_code": "FAST_INSTANT_TRADE_PAY", }
biz_content.update(kwargs) data = self.build_body("alipay.trade.page.pay", biz_content, self.return_url) return self.sign_data(data) def build_body(self, method, biz_content, return_url=None): data = { "app_id": self.appid, "method": method, "charset": "utf-8", "sign_type": "RSA2", "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "version": "1.0", "biz_content": biz_content }
if return_url is not None: data["notify_url"] = self.app_notify_url data["return_url"] = self.return_url
return data def sign_data(self, data): data.pop("sign", None) unsigned_items = self.ordered_data(data) unsigned_string = "&".join("{0}={1}".format(k, v) for k, v in unsigned_items) sign = self.sign(unsigned_string.encode("utf-8")) quoted_string = "&".join("{0}={1}".format(k, quote_plus(v)) for k, v in unsigned_items)
signed_string = quoted_string + "&sign=" + quote_plus(sign) return signed_string def ordered_data(self, data): complex_keys = [] for key, value in data.items(): if isinstance(value, dict): complex_keys.append(key)
for key in complex_keys: data[key] = json.dumps(data[key], separators=(',', ':'))
return sorted([(k, v) for k, v in data.items()])
def sign(self, unsigned_string): key = self.app_private_key signer = PKCS1_v1_5.new(key) signature = signer.sign(SHA256.new(unsigned_string)) sign = encodebytes(signature).decode("utf8").replace("\n", "") return sign
def _verify(self, raw_content, signature): key = self.alipay_public_key signer = PKCS1_v1_5.new(key) digest = SHA256.new() digest.update(raw_content.encode("utf8")) if signer.verify(digest, decodebytes(signature.encode("utf8"))): return True return False
def verify(self, data, signature): if "sign_type" in data: sign_type = data.pop("sign_type") unsigned_items = self.ordered_data(data) message = "&".join(u"{}={}".format(k, v) for k, v in unsigned_items) return self._verify(message, signature)
def api_alipay_trade_refund(self,refund_amount,out_trade_no=None,trade_no=None,**kwargs):
biz_content = { "refund_amount":refund_amount}
biz_content.update(**kwargs)
if out_trade_no: biz_content["out_trade_no"] = out_trade_no if trade_no: biz_content["trade_no"] = trade_no
data = self.build_body("alipay.trade.refund",biz_content)
url = self.__gateway+"?" + self.sign_data(data)
r = requests.get(url) html = r.content.decode("utf-8")
return html
|