重置 Git 索引,移除不需要的路径引用

This commit is contained in:
wystan_rin 2024-11-13 16:09:26 +08:00
parent f6272271ca
commit e634f7eaee
38 changed files with 0 additions and 1359 deletions

2
.gitignore vendored
View File

@ -1,2 +0,0 @@
*.log
/build/

8
.idea/.gitignore vendored
View File

@ -1,8 +0,0 @@
# Default ignored files
/shelf/
/workspace.xml
# Editor-based HTTP Client requests
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml

View File

@ -1,14 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="PublishConfigData" remoteFilesAllowedToDisappearOnAutoupload="false">
<serverData>
<paths name="root@192.168.16.207:22 password">
<serverdata>
<mappings>
<mapping local="$PROJECT_DIR$" web="/" />
</mappings>
</serverdata>
</paths>
</serverData>
</component>
</project>

View File

@ -1,6 +0,0 @@
<component name="InspectionProjectProfileManager">
<profile version="1.0">
<option name="myName" value="Project Default" />
<inspection_tool class="Eslint" enabled="true" level="WARNING" enabled_by_default="true" />
</profile>
</component>

View File

@ -1,6 +0,0 @@
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>

View File

@ -1,7 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Black">
<option name="sdkName" value="Python 3.11" />
</component>
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.11" project-jdk-type="Python SDK" />
</project>

View File

@ -1,13 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<module version="4">
<component name="Flask">
<option name="enabled" value="true" />
</component>
<component name="PyDocumentationSettings">
<option name="format" value="PLAIN" />
<option name="myDocStringFormat" value="Plain" />
</component>
<component name="TemplatesService">
<option name="TEMPLATE_CONFIGURATION" value="Jinja2" />
</component>
</module>

View File

@ -1,7 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="" vcs="Git" />
<mapping directory="$PROJECT_DIR$/payment_headend" vcs="Git" />
</component>
</project>

View File

@ -1,32 +0,0 @@
/data/
# Python:
*.ipynb
*/__pycache__
/.vagrant
/scrapy.iml
*.pyc
_trial_temp*
dropin.cache
docs/build
*egg-info
.tox
venv
build
dist
.idea
htmlcov/
.coverage
.pytest_cache/
.coverage.*
.cache/
.mypy_cache/
/tests/keys/localhost.crt
/tests/keys/localhost.key
key.txt
key.txt.pub
# Windows
Thumbs.db
ehthumbs.db
Desktop.ini

View File

@ -1,3 +0,0 @@
# payment
支付系统

View File

@ -1 +0,0 @@
from tronscan import Tronscan

View File

@ -1,411 +0,0 @@
import requests
from custom_decorators import singleton
from utils.tronscan import convert_to_tronscan_timestamp
trc20token_info = {
"usdt": {"tokenId": "TR7NHqjeKQxGTCi8q8ZY4pL8otSzgjLj6t",
"tokenAbbr": "USDT",
"tokenName": "Tether USD",
"tokenLogo": "https://static.tronscan.org/production/logo/usdtlogo.png",
"issuerAddr": "THPvaUhoh2Qn2y9THCZML3H815hhFhn5YC",
"vip": True}
}
@singleton
class Tronscan:
def __init__(self, api_key):
self.api_key = api_key
def accountv2(self, address):
"""
Get account detail information
:param address: Account address
:return: Returns the detail information of an account.
"""
response = requests.get(f"https://apilist.tronscanapi.com/api/accountv2?address={address}",
headers={'TRON-PRO-API-KEY': self.api_key})
return response.json()
def transactions(self, start=0, limit=10, start_timestamp=None, end_timestamp=None,
from_address=None, to_address=None, tokens=None, block=None,
type_=None, method=None):
"""
Get a list of transactions.
:param start: Start number. Default 0
:param limit: Number of items per page. Default 10
:param start_timestamp: Start time
:param end_timestamp: End time
:param from_address: Sender's address.
:param to_address: Recipient's address.
:param tokens: Tokens involved
:param block: Block
:param type_: Transaction type
:param method: Method called in a smart contract signature. Only one value can be specified each time.
:return: Getx a list of transactions.
"""
params = {
"sort": "-timestamp",
"count": "true",
"start": start,
"limit": limit,
}
if start_timestamp is not None:
params["start_timestamp"] = convert_to_tronscan_timestamp(start_timestamp)
if end_timestamp is not None:
params["end_timestamp"] = convert_to_tronscan_timestamp(end_timestamp)
if from_address is not None:
params["fromAddress"] = from_address
if to_address is not None:
params["toAddress"] = to_address
if tokens is not None:
params["tokens"] = tokens
if block is not None:
params["block"] = block
if type_ is not None:
params["type"] = type_
if method is not None:
params["method"] = method
response = requests.get(
"https://apilist.tronscanapi.com/api/transaction",
headers={'TRON-PRO-API-KEY': self.api_key},
params=params
)
return response.json()
def transaction_info(self, hash_):
"""
Get transaction detail information by transaction hash.
:param hash_: Transaction hash
:return: Get transaction information.
"""
params = {
"hash": hash_,
}
response = requests.get(
"https://apilist.tronscanapi.com/api/transaction-info",
headers={'TRON-PRO-API-KEY': self.api_key},
params=params
)
return response.json()
def token_trc20_transfers(self, start=None, limit=None, contract_address=None,
start_timestamp=None, end_timestamp=None, confirm=True,
related_address=None, from_address=None, to_address=None):
"""
Get the transfer list of TRC20 and TRC721 tokens.
:param start: Start number. Default 0
:param limit: Number of items per page. Default 10
:param contract_address: Contract address
:param start_timestamp: Start time
:param end_timestamp: End time
:param confirm: Whether to return confirmed transfers only. Default: True
:param related_address: Account address
:param from_address: Sender's address
:param to_address: Recipient's address
:return: Get the transfer list of TRC20 and TRC721 tokens.
"""
params = {
"filterTokenValue": 1
}
if start is not None:
params["start"] = start
if limit is not None:
params["limit"] = limit
if contract_address is not None:
params["contract_address"] = contract_address
if start_timestamp is not None:
params["start_timestamp"] = convert_to_tronscan_timestamp(start_timestamp)
if end_timestamp is not None:
params["end_timestamp"] = convert_to_tronscan_timestamp(end_timestamp)
if confirm is not None:
params["confirm"] = str(confirm).lower()
if related_address is not None:
params["relatedAddress"] = related_address
if from_address is not None:
params["fromAddress"] = from_address
if to_address is not None:
params["toAddress"] = to_address
response = requests.get(
"https://apilist.tronscanapi.com/api/token_trc20/transfers",
headers={'TRON-PRO-API-KEY': self.api_key},
params=params
)
return response.json()
def transfer(self, sort="-timestamp", start=0, limit=10, count="true",
address=None, from_address=None, to_address=None, tokens=None, block=None):
"""
Get account's transfer list.
:param sort: Sort type
:param start: Start index, default is 0
:param limit: Number of transfers per page
:param count: Whether to return total transfer number.
:param address: Address, like contract address
:param from_address: Sender's address
:param to_address: Recipient's address
:param tokens: Specific tokens
:param block: Block number
:return: Get account's transfer list.
"""
params = {
"sort": sort,
"start": start,
"limit": limit,
"count": count,
"filterTokenValue": 1
}
if address is not None:
params["address"] = address
if from_address is not None:
params["fromAddress"] = from_address
if to_address is not None:
params["toAddress"] = to_address
if tokens is not None:
params["tokens"] = tokens
if block is not None:
params["block"] = block
response = requests.get(
"https://apilist.tronscanapi.com/api/transfer",
headers={'TRON-PRO-API-KEY': self.api_key},
params=params
)
return response.json()
def internal_transactions(self, start=0, limit=10, address=None, contract=None, block=None):
"""
Get internal transaction list for a specific address or block.
:param start: Start index, default is 0
:param limit: Number of transfers per page
:param address: Specific address. At least one of address, block, or contract must be specified
:param contract: Sender's address
:param block: Block number
:return: Get the internal transaction list.
"""
params = {
"start": start,
"limit": limit
}
if address is not None:
params["address"] = address
if contract is not None:
params["contract"] = contract
if block is not None:
params["block"] = block
response = requests.get(
"https://apilist.tronscanapi.com/api/internal-transaction",
headers={'TRON-PRO-API-KEY': self.api_key},
params=params
)
return response.json()
def token_trc20_transfers_with_status(self, start=0, limit=10, trc20Id=None, address=None,
direction=0, db_version=0, reverse="false"):
"""
Get account's transaction data.
:param start: Start index, default is 0
:param limit: Number of transfers per page
:param trc20Id: TRC20 token address
:param address: Account address
:param direction: 0 for all, 1 for transfer-out, 2 for transfer-in
:param db_version: Whether to include approval transfers. 1 for include, 0 for exclude
:param reverse: Sort by creation time. Valid values: "true" or "false"
:return: Get account's transaction data.
"""
params = {
"start": start,
"limit": limit,
"direction": direction,
"db_version": db_version,
"reverse": reverse
}
if trc20Id is not None:
params["trc20Id"] = trc20Id
if address is not None:
params["address"] = address
response = requests.get(
"https://apilist.tronscanapi.com/api/token_trc20/transfers-with-status",
headers={'TRON-PRO-API-KEY': self.api_key},
params=params
)
return response.json()
def search(self, term, type="token", start=0, limit=10):
"""
Search token/contract/account information
Note : The maximum value for limit is 50.
:param term: Search term
:param type: Search type, including "token", "address", "contract", "transaction" and "block"
:param start: Start number. Default: 0
:param limit: Number of items per page. Default: 10
:return: Returns account authorization change records.
"""
params = {
"term": term,
"type": type,
"start": start,
"limit": limit,
}
response = requests.get("https://apilist.tronscanapi.com/api/search/v2",
headers={'TRON-PRO-API-KEY': self.api_key},
params=params)
return response.json()
def approve_change(self, contract_address, from_address, to_address, start=0, limit=20, show=3):
"""
Returns account authorization change records.
:param contract_address: Contract address
:param from_address: Originator address
:param to_address: Recipient address
:param start: Start number. Default: 0
:param limit: Number of items per page. Default: 20
:param show: Token type. 1: TRC20 2: TRC721 3: ALL(default) 4: TRC1155
:return:
"""
params = {
"contract_address": contract_address,
"from_address": from_address,
"to_address": to_address,
"start": start,
"limit": limit,
"show": show,
"type": "approve",
}
response = requests.get("https://apilist.tronscanapi.com/api/account/approve/change",
headers={'TRON-PRO-API-KEY': self.api_key},
params=params)
return response.json()
def transfer_trx(self, address, start_timestamp=None, end_timestamp=None, start=0, limit=20, direction=1,
db_version=0,
reverse=True, fee=False):
"""
Get the list of trx transfers related to a specific address
Note : The value sum of start and limit must be less than or equal to 10000.
:param address: Query address
:param start_timestamp: Start timestamp
:param end_timestamp: End timestamp
:param start: Start number. Default: 0
:param limit: Number of items per page. Default: 20
:param direction: Default: 1. 1 represents inbound transfers, 2 represents outbound transfers, and 0 represents both.
:param db_version: Default: 0, which indicates to filter transfers with invalid to or from addresses out.
:param reverse: Sort the data in a descending order. Default: true
:param fee: Whether to return data of TRX burning for resource consumption. Default: false
:return: Returns the list of TRX transfers for a specific address.
"""
params = {
"address": address,
"start": start,
"limit": limit,
"direction": direction,
"db_version": db_version,
"reverse": reverse,
"fee": fee,
}
if start_timestamp is not None:
params["start_timestamp"] = convert_to_tronscan_timestamp(start_timestamp)
if end_timestamp is not None:
params["end_timestamp"] = convert_to_tronscan_timestamp(end_timestamp)
response = requests.get("https://apilist.tronscanapi.com/api/transfer/trx",
headers={'TRON-PRO-API-KEY': self.api_key},
params=params)
return response.json()
def transfer_token10(self, address, trc10Id, start_timestamp=None, end_timestamp=None, start=0, limit=20,
direction=1, db_version=0, reverse=True):
"""
Get the transfer list of a specific TRC10 token for a certain address
Note : The value sum of start and limit must be less than or equal to 10000.
:param address: Query address
:param trc10Id: TRC10 token ID
:param start_timestamp: Start timestamp
:param end_timestamp: End timestamp
:param start: Start number. Default: 0
:param limit: Number of items per page. Default: 20
:param direction: Default: 1. 1 represents inbound transfers, 2 represents outbound transfers, and 0 represents both.
:param db_version: Default: 0, which indicates to filter transfers with invalid to or from addresses out.
:param reverse: Sort the data in a descending order. Default: true
:return: Returns the transfer list of a TRC10 token for a specific account.
"""
params = {
"address": address,
"trc10Id": trc10Id,
"start": start,
"limit": limit,
"direction": direction,
"db_version": db_version,
"reverse": reverse,
}
if start_timestamp is not None:
params["start_timestamp"] = convert_to_tronscan_timestamp(start_timestamp)
if end_timestamp is not None:
params["end_timestamp"] = convert_to_tronscan_timestamp(end_timestamp)
response = requests.get("https://apilist.tronscanapi.com/api/transfer/token10",
headers={'TRON-PRO-API-KEY': self.api_key},
params=params)
return response.json()
def transfer_trc20(self, address, trc20Id, start_timestamp=None, end_timestamp=None, start=0, limit=20,
direction=1, db_version=0, reverse=True):
"""
Get the transfer list of a specific TRC20 token for a certain address
Note : The value sum of start and limit must be less than or equal to 10000.
:param address: Query address
:param trc20Id: TRC20 token ID
:param start_timestamp: Start timestamp
:param end_timestamp: End timestamp
:param start: Start number. Default: 0
:param limit: Number of items per page. Default: 20
:param direction: Default: 1. 1 represents inbound transfers, 2 represents outbound transfers, and 0 represents both.
:param db_version: Default: 0, which indicates to filter transfers with invalid to or from addresses out.
:param reverse: Sort the data in a descending order. Default: true
:return: Returns the transfer list of a TRC20 token for a specific account.
"""
params = {
"address": address,
"trc20Id": trc20Id,
"start": start,
"limit": limit,
"direction": direction,
"db_version": db_version,
"reverse": reverse,
}
if start_timestamp is not None:
params["start_timestamp"] = convert_to_tronscan_timestamp(start_timestamp)
if end_timestamp is not None:
params["end_timestamp"] = convert_to_tronscan_timestamp(end_timestamp)
response = requests.get("https://apilist.tronscanapi.com/api/transfer/trc20",
headers={'TRON-PRO-API-KEY': self.api_key},
params=params)
return response.json()
def account_wallet(self, address, asset_type=0):
"""
Get the information of tokens held and followed in the account's web wallet
:param address: Query address
:param asset_type: Asset types: 0 - All (default); 1 - Assets (TRX, TRC10, TRC20); 2 - Collectibles (TRC721 and TRC1155)
:return: Returns a list of tokens held and followed by an account.
"""
params = {
"address": address,
"asset_type": asset_type,
}
response = requests.get("https://apilist.tronscanapi.com/api/account/wallet",
headers={'TRON-PRO-API-KEY': self.api_key},
params=params)
return response.json()
if __name__ == '__main__':
address = "TB592A5QwHvvcJoCmvALmzT3S9Pux91Gub"
tronscan = Tronscan(api_key='cc87d361-7cd6-4f69-a57b-f0a77a213355')
print(tronscan.transfer_trc20(address, trc20token_info["usdt"]["tokenId"]))

View File

@ -1,42 +0,0 @@
from flask import Flask, request, jsonify
from config import get_config
from services.order import OrderService
config = get_config()
app = Flask(__name__)
order_service = OrderService() # 获取单例实例
@app.route('/createOrder', methods=['POST'])
def create_order():
data = request.get_json()
phone = data.get('phone', None)
email = data.get('email', None)
address = data.get('address', None)
try:
payment_method = data['paymentMethod']
except KeyError:
return jsonify({
"message": "Unsupported payment method. Currently, only USDT payments are supported."
}), 400
addresses = order_service.get_user_addresses(phone, email, address, payment_method)
if not addresses:
return jsonify({
"message": "No payment address associated with you was found. Please provide a payment address."
}), 400
if len(addresses) == 1:
order_id = order_service.create_order(addresses[0])
return jsonify({"order_id": order_id}), 200
# 多个地址的情况
return jsonify({
"message": "请选择一个地址进行下单。",
"addresses": addresses
}), 200
if __name__ == '__main__':
app.run(debug=True)

View File

@ -1 +0,0 @@
from .utils import get_config

View File

@ -1,7 +0,0 @@
config = {
'user': 'your_mysql_username',
'password': 'your_mysql_password',
'host': 'localhost',
'database': 'your_database_name',
'autocommit': False,
}

View File

@ -1,13 +0,0 @@
[APIKey]
tronscan=cc87d361-7cd6-4f69-a57b-f0a77a213355
[PaymentAddresses]
usdt=TB592A5QwHvvcJoCmvALmzT3S9Pux91Gub
[MYSQL]
user: 'your_mysql_username'
password: 'your_mysql_password'
host: 'localhost'
database: 'your_database_name'
autocommit: false
allow_multi_statements: True

View File

@ -1,145 +0,0 @@
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
"""
@Project payment
@File __init__.py.py
@IDE PyCharm
@Author rengengchen
@Time 2024/11/06 16:11
"""
import argparse
import os
import random
import sys
from argparse import Namespace
from configparser import ConfigParser
import requests
from loguru import logger
random_seed = 20240717
ROOT_DIR = os.path.abspath(os.path.dirname(os.path.dirname(__file__)))
def setup_seed(seed):
# import torch
# torch.manual_seed(seed)
# torch.cuda.manual_seed_all(seed)
# torch.backends.cudnn.deterministic = True
# import numpy as np
# np.random.seed(seed)
random.seed(seed)
setup_seed(random_seed)
class Setting:
def __init__(self,
config_parser: ConfigParser = None,
argument_parser: Namespace = None,
_visited=None,
_parent=None,
**kwargs):
self._parent = _parent
if config_parser:
for section in config_parser.sections():
section_config = Setting(_parent=self)
for option in config_parser.options(section):
section_config[option] = config_parser.get(section, option)
self.__dict__[section] = section_config
if argument_parser:
for k in vars(argument_parser):
self.__dict__[k] = getattr(argument_parser, k)
if _visited is None:
_visited = set()
self.update(_visited=_visited, **kwargs)
def update(self, _visited=None, **kwargs):
if _visited is None:
_visited = {}
for k, v in kwargs.items():
cls_attr = getattr(self.__class__, k, None)
if callable(cls_attr):
raise KeyError(f"The key '{k}' conflicts with an existing class method. you can use {k}_ instead.")
if isinstance(v, dict):
obj_id = id(v)
if obj_id in _visited:
logger.warning(f"Circular reference detected in key: '{k}'.")
v = _visited[obj_id]
else:
v = Setting(_visited=_visited, _parent=self, **v)
_visited[obj_id] = v
self.__dict__[k] = v
def get(self, item, default=None):
if item not in self.__dict__ and self._parent is not None:
return self._parent.get(item, default)
return default
def get_int(self, item):
return int(self.get(item))
def get_float(self, item):
return float(self.get(item))
def get_bool(self, item):
return bool(self.get(item))
def set(self, item, value):
self.__dict__[item] = value
def __getitem__(self, item):
return self.__dict__[item]
def __setitem__(self, key, value):
self.__dict__[key] = value
def __getattr__(self, key):
return self.get(key)
def __str__(self):
def _str_helper(config, indent=0, visited=None):
if visited is None:
visited = set()
lines = []
indent_str = ' ' * indent
for key, value in config.__dict__.items():
if key.startswith('_'):
continue
if isinstance(value, Setting):
if id(value) in visited:
lines.append(f"{indent_str}{key}: <Circular Reference>")
else:
visited.add(id(value))
lines.append(f"{indent_str}{key}:")
lines.append(_str_helper(value, indent + 1, visited))
else:
lines.append(f"{indent_str}{key}: {value}")
return '\n'.join(lines)
return _str_helper(self)
def log_config(config):
# fmt = '%(asctime)s [%(name)s] %(levelname)s: %(message)s'
# datefmt = "%Y-%m-%d %H:%M:%S"
logger.remove()
logger.add(sys.stdout, level=config.log_level)
logger.add(sys.stderr, level="ERROR")
logger.add(os.path.join(ROOT_DIR, "logs", "{time}.log"), level="DEBUG", encoding='utf8', rotation="100 MB",
retention=3)
def get_config(config_file=fr'{ROOT_DIR}/config/param.ini') -> Setting:
requests.adapters.DEFAULT_RETRIES = 3
configparser = ConfigParser()
configparser.read(config_file)
parser = argparse.ArgumentParser(description='payment system')
parser.add_argument("--seed", type=int, default=2024)
args = parser.parse_args()
config = Setting(configparser, args)
return config

View File

@ -1,17 +0,0 @@
def singleton(cls):
"""
Decorator for making a class a singleton.
This ensures that only one instance of the class exists.
"""
instances = {} # Dictionary to store the instance of the singleton class
def get_instance(*args, **kwargs):
"""
If an instance of the class does not exist, create one and store it.
If it exists, return the existing instance.
"""
if cls not in instances:
instances[cls] = cls(*args, **kwargs)
return instances[cls]
return get_instance

View File

@ -1,66 +0,0 @@
from loguru import logger
from mysql.connector import connect, Error, OperationalError
from mysql.connector import errors as db_errors
from custom_decorators import singleton
@singleton
class Database:
def __init__(self, config):
self.connection = None
self.config = config
self.connect()
def connect(self):
"""Establish a new database connection."""
try:
self.connection = connect(**self.config)
if self.connection.is_connected():
logger.info("Connected to MySQL database")
except Error as e:
logger.info(f"Error while connecting to MySQL: {e}")
self.connection = None
def get_connection(self):
"""Get the database connection, with reconnection logic."""
if self.connection is None or not self.connection.is_connected():
logger.info("Reconnecting to the database...")
self.connect()
return self.connection
def close_connection(self):
if self.connection and self.connection.is_connected():
self.connection.close()
logger.info("MySQL connection is closed")
def execute_query(self, query, params=None):
"""Execute a query with optional parameters, supports transactions."""
cursor = None
try:
connection = self.get_connection()
cursor = connection.cursor()
cursor.execute(query, params)
return cursor
except OperationalError as e:
logger.info(f"Operational error: {e}. Attempting to reconnect...")
self.connect()
cursor = self.get_connection().cursor()
cursor.execute(query, params)
return cursor
except db_errors.Error as e:
logger.info(f"Database error: {e}")
raise
finally:
if cursor:
cursor.close()
def commit(self):
"""Commit the current transaction."""
if self.connection:
self.connection.commit()
def rollback(self):
"""Rollback the current transaction."""
if self.connection:
self.connection.rollback()

View File

@ -1,93 +0,0 @@
from utils.database import pack_params
class User:
def __init__(self, id_=None, name=None, phone=None, email=None, address=None, payment_method=None):
self.id = id_
self.name = name
self.phone = phone
self.email = email
self.address = address
self.payment_method = payment_method
def insert_sql(self, params_format="list"):
params_sql, params = pack_params(params_format=params_format, param_sql="{param}", join_str=",",
name=self.name, phone=self.phone, email=self.email, address=self.address,
payment_method=self.payment_method)
return f"INSERT INTO user ({params_sql}) VALUES ({','.join('%s' for _ in params)})", params
def select_sql(self, condition="AND", params_format="list"):
params_sql, params = pack_params(params_format=params_format, param_sql="{param}=%s", join_str=f" {condition} ",
name=self.name, phone=self.phone, email=self.email, address=self.address,
payment_method=self.payment_method)
return f"SELECT id, name, phone, email, address, payment_method FROM user WHERE {params_sql}", params
def exists_sql(self, condition="AND", params_format="list"):
params_sql, params = pack_params(params_format=params_format, param_sql="{param}=%s", join_str=f" {condition} ",
name=self.name, phone=self.phone, email=self.email, address=self.address,
payment_method=self.payment_method)
return f"SELECT id FROM user WHERE {params_sql} LIMIT 1", params
def params(self, format="dict"):
if format == "list":
params = []
elif format == "dict":
params = {}
else:
raise ValueError("format must be list or dict")
if self.uid:
if format == "list":
params.append(self.uid)
elif format == "dict":
params["uid"] = self.uid
if self.name:
if format == "list":
params.append(self.name)
elif format == "dict":
params["name"] = self.name
if self.phone:
if format == "list":
params.append(self.phone)
elif format == "dict":
params["phone"] = self.phone
if self.email:
if format == "list":
params.append(self.email)
elif format == "dict":
params["email"] = self.email
if self.address:
if format == "list":
params.append(self.address)
elif format == "dict":
params["address"] = self.address
if self.payment_method:
if format == "list":
params.append(self.payment_method)
elif format == "dict":
params["payment_method"] = self.payment_method
return params
def get_difference(self, other_user):
different_attrs = {}
if self.uid != other_user.uid:
different_attrs["uid"] = (self.uid, other_user.uid)
if self.name != other_user.name:
different_attrs["name"] = (self.name, other_user.name)
if self.phone != other_user.phone:
different_attrs["phone"] = (self.phone, other_user.phone)
if self.email != other_user.email:
different_attrs["email"] = (self.email, other_user.email)
if self.address != other_user.address:
different_attrs["address"] = (self.address, other_user.address)
if self.payment_method != other_user.payment_method:
different_attrs["payment_method"] = (self.payment_method, other_user.payment_method)
return different_attrs
def __eq__(self, other):
if isinstance(other, User):
return ((self.name, self.phone, self.email, self.address, self.payment_method)
== (other.name, other.phone, other.email, other.address, other.payment_method))
return False
def __hash__(self):
return hash((self.name, self.phone, self.email, self.address, self.payment_method))

View File

@ -1,41 +0,0 @@
from ruamel_yaml.util import create_timestamp
from custom_decorators import singleton
from database import Database
from utils.datetime import current_timestamp
@singleton
class OrderRepository:
def __init__(self, config):
self.db = Database(config['MYSQL'])
def create(self, order_id, from_address, to_address):
cur_time = current_timestamp()
try:
self.db.execute_query(
"INSERT INTO orders (order_id, from_address, to_address, create_timestamp, update_timestamp) "
"VALUES (%s, %s, %s, %s, %s)",
[order_id, from_address, to_address, cur_time, cur_time]
)
self.db.commit()
except Exception:
self.db.rollback()
raise
def update_status(self, order_id, status):
try:
self.db.execute_query("UPDATE orders "
"SET status = %s, update_timestamp = %s "
"WHERE order_id = %s",
[status, current_timestamp(), order_id])
self.db.commit()
except Exception:
self.db.rollback()
raise
def get_order_info(self, order_id):
self.db.execute_query("SELECT quant, from_address, to_address, create_timestamp "
"FROM orders "
"WHERE order_id = %s",
[order_id])

View File

@ -1,74 +0,0 @@
import itertools
from custom_decorators import singleton
from database import Database
from models import User
@singleton
class UserRepository:
def __init__(self, config):
self.db = Database(config['MYSQL'])
def get_or_create(self, user):
users = []
cursor = self.db.execute_query(*user.select_sql(condition="OR"))
same_users = cursor.fetchall()
new_user = not len(same_users)
# 对用户已存在的属性判断是否有新属性
update_user = set()
update_sqls = []
update_params_list = []
delete_params = []
exist_conflicting_attr = False
for same_user in same_users:
exist_conflicting_attr = False
different_attrs = user.get_difference(same_user)
# 用于判断是否有新属性
update_sql_params = []
update_params = []
for k, v in different_attrs.items():
new_attr, exist_attr = v
if exist_attr is None:
setattr(same_user, k, new_attr)
update_sql_params.append(f"{k}=%s")
update_params.append(new_attr)
else:
# 出现冲突的属性,考虑新增一行记录
exist_conflicting_attr = True
break
if same_user in update_user:
delete_params.append((same_user.id,))
else:
users.append(same_user)
exist_new_attr = bool(update_params)
if exist_new_attr:
update_user.add(same_user)
update_sqls.append(f'UPDATE user SET {",".join(update_sql_params)} WHERE id=%s;')
update_params.append(same_user.id)
update_params_list.append(update_params)
sql_flag = False
try:
if delete_params:
sql_flag = True
self.db.get_connection().cursor().executemany("DELETE FROM user WHERE id=%s", delete_params)
if update_user:
sql_flag = True
self.db.get_connection().cursor().execute("".join(update_sqls),
list(itertools.chain.from_iterable(update_params_list)),
multi=True)
if sql_flag:
self.db.commit()
except Exception:
self.db.rollback()
raise
if new_user or exist_conflicting_attr:
try:
self.db.execute_query(*user.insert_sql())
self.db.commit()
except Exception:
self.db.rollback()
raise
users.append(user)
return users

View File

@ -1,3 +0,0 @@
loguru==0.7.2
mysql-connector-python==9.1.0
Requests==2.32.3

View File

@ -1,58 +0,0 @@
import uuid
from custom_decorators import singleton
from models import User
from repositories.order import OrderRepository
from repositories.user import UserRepository
from services.payment import PaymentService
from utils.datetime import current, current_timestamp, is_time_difference_greater_than
@singleton
class OrderService:
def __init__(self, config):
self.config = config
self.payment_service = PaymentService()
self.order_repo = OrderRepository(config)
self.user_repo = UserRepository(config)
def get_user_addresses(self, phone=None, email=None, address=None, payment_method=None):
if address is None:
if phone or email:
users = self.user_repo.get_or_create(User(phone=phone, email=email))
addresses = set(user.address for user in users if address)
return list(addresses)
raise ValueError('A phone number, email, or address is required.')
return [address]
def create_order(self, address=None):
date_str = current().strftime('%Y%m%d%H%M%S')
unique_id = str(uuid.uuid4()).split('-')[0]
order_id = f"{date_str}-{unique_id}"
self.order_repo.create(order_id, address,
self.config['PaymentAddresses'])
return order_id
def finish_order(self, order_id):
# 判断支付时间是否超过订单存活时间
quant, from_address, to_address, create_timestamp = self.order_repo.get_order_info(order_id)
current = current_timestamp()
status = 0
if is_time_difference_greater_than(create_timestamp, current, minutes=15):
# 订单超时
status = 4
else:
correct_quant, confirmed = self.payment_service.check_payment(quant, from_address, to_address, create_timestamp, current)
if correct_quant and confirmed:
# 支付成功
status = 1
elif correct_quant < 0:
# 没有转账
status = 2
elif confirmed:
# 金额不对
status = 3
if status:
self.order_repo.update_status(order_id, status)
return status

View File

@ -1,23 +0,0 @@
from api import Tronscan
from custom_decorators import singleton
from utils.datetime import current_timestamp
@singleton
class PaymentService:
def __init__(self, api_key):
self.tronscan = Tronscan(api_key)
def check_payment(self, quant, from_address, to_address, order_create_timestamp, end_timestamp=None):
if end_timestamp is None:
end_timestamp = current_timestamp()
result = self.tronscan.token_trc20_transfers(limit=100,
from_address=from_address, to_address=to_address,
start_timestamp=order_create_timestamp, end_timestamp=end_timestamp)
if result['rangeTotal'] == 0:
return -1, 0
token_transfers = result['token_transfers']
token_transfer = token_transfers[-1]
confirmed = token_transfer['confirmed']
correct_quant = int(quant == (token_transfer['quant'] / 6))
return correct_quant, confirmed

View File

@ -1,6 +0,0 @@
from custom_decorators import singleton
@singleton
class UserService:
pass

View File

@ -1,35 +0,0 @@
import unittest
from unittest.mock import patch
from api import Tronscan
class TestExternalAPICalls(unittest.TestCase):
def setUp(self):
# Setup code runs before every test method
self.tronscan = Tronscan(api_key='cc87d361-7cd6-4f69-a57b-f0a77a213355')
@unittest.skip("Skipping this test temporarily")
@patch('requests.get')
def test_real_api_call(self, mock_get):
# Mocking API response
mock_response = {
"data": [{"name": "ExampleToken", "symbol": "ETK"}]
}
mock_get.return_value.status_code = 200
mock_get.return_value.json.return_value = mock_response
# Call the method
response = self.tronscan.search(term="example", type="token")
# Assert the response
self.assertEqual(response, mock_response)
mock_get.assert_called_once_with(
"https://apilist.tronscanapi.com/api/search/v2",
headers={'TRON-PRO-API-KEY': self.api_key},
params={"term": "example", "type": "token", "start": 0, "limit": 10}
)
if __name__ == "__main__":
unittest.main()

View File

@ -1,16 +0,0 @@
def pack_params(params_format="list", param_sql="{param}=%s", join_str=" AND ", **kwargs):
if params_format == "list":
params = []
elif params_format == "dict":
params = {}
else:
raise ValueError("Unknown params format")
param_sqls = []
for k, v in kwargs.items():
if v is not None:
if params_format == "list":
params.append(v)
elif params_format == "dict":
params[k] = v
param_sqls.append(param_sql.format(param=k))
return join_str.join(param_sqls), params

View File

@ -1,35 +0,0 @@
import datetime
def current():
return datetime.datetime.now()
def current_timestamp():
datetime.datetime.now().timestamp()
def is_time_difference_greater_than(timestamp1, timestamp2, hours=0, minutes=0, seconds=0):
"""
判断两个时间戳的时间差是否大于指定的小时分钟和秒数
参数:
timestamp1 (int): 第一个时间戳
timestamp2 (int): 第二个时间戳
hours (int): 要比较的小时数默认是0小时
minutes (int): 要比较的分钟数默认是0分钟
seconds (int): 要比较的秒数默认是0秒
返回:
bool: 如果时间差大于指定的小时分钟和秒数返回True否则返回False
"""
# 将时间戳转换为 datetime 对象
time1 = datetime.fromtimestamp(timestamp1)
time2 = datetime.fromtimestamp(timestamp2)
# 计算时间差
time_difference = abs(time2 - time1)
# 计算指定的时间差值
threshold = datetime.timedelta(hours=hours, minutes=minutes, seconds=seconds)
# 判断时间差是否大于指定的时间
return time_difference > threshold

View File

@ -1,2 +0,0 @@
def convert_to_tronscan_timestamp(timestamp):
return int(timestamp * 1000)

View File

@ -1,30 +0,0 @@
# Python:
*.ipynb
*/__pycache__
/.vagrant
/scrapy.iml
*.pyc
_trial_temp*
dropin.cache
docs/build
*egg-info
.tox
venv
build
dist
.idea
htmlcov/
.coverage
.pytest_cache/
.coverage.*
.cache/
.mypy_cache/
/tests/keys/localhost.crt
/tests/keys/localhost.key
key.txt
key.txt.pub
# Windows
Thumbs.db
ehthumbs.db
Desktop.ini

View File

@ -1,131 +0,0 @@
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<title>创建订单</title>
<style>
/* 简单的样式 */
.hidden { display: none; }
.error { color: red; }
</style>
</head>
<body>
<h1>创建订单</h1>
<form id="order-form">
<label for="phone">手机号:</label>
<input type="text" id="phone" name="phone"><br><br>
<label for="email">邮箱:</label>
<input type="email" id="email" name="email"><br><br>
<label for="address">地址:</label>
<input type="text" id="address" name="address"><br><br>
<button type="submit">提交订单</button>
</form>
<div id="message" class="hidden"></div>
<div id="addresses" class="hidden">
<h2>请选择一个地址:</h2>
<ul id="address-list"></ul>
</div>
<div id="error" class="error hidden"></div>
<script>
async function createOrder(phone = "", email = "", address = "") {
try {
const response = await fetch('/create_order', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({ phone, email, address })
});
const data = await response.json();
if (response.ok) {
if (data.order_id) {
handleOrderSuccess(data.order_id);
} else if (data.addresses && Array.isArray(data.addresses)) {
handleMultipleAddresses(data.addresses);
}
} else {
if (data.message === "地址不能为空,请输入地址。") {
promptUserForAddress();
} else {
handleError(data.message || '发生未知错误。');
}
}
} catch (error) {
console.error('网络或服务器错误:', error);
handleError('网络或服务器错误,请稍后再试。');
}
}
function handleOrderSuccess(orderId) {
document.getElementById('message').innerText = `订单创建成功!您的订单号是:${orderId}`;
document.getElementById('message').classList.remove('hidden');
document.getElementById('addresses').classList.add('hidden');
document.getElementById('error').classList.add('hidden');
}
function handleMultipleAddresses(addresses) {
const addressList = document.getElementById('address-list');
addressList.innerHTML = ''; // 清空现有列表
addresses.forEach((addr, index) => {
const li = document.createElement('li');
const button = document.createElement('button');
button.innerText = `选择地址 ${index + 1}`;
button.addEventListener('click', () => {
// 选择地址后重新提交订单
createOrder(null, null, addr);
});
li.innerText = addr + ' ';
li.appendChild(button);
addressList.appendChild(li);
});
document.getElementById('addresses').classList.remove('hidden');
document.getElementById('message').classList.add('hidden');
document.getElementById('error').classList.add('hidden');
}
function promptUserForAddress() {
const address = prompt('地址不能为空,请输入您的地址:');
if (address && address.trim() !== "") {
createOrder(null, null, address.trim());
} else {
handleError('地址输入为空,无法创建订单。');
}
}
function handleError(message) {
const errorDiv = document.getElementById('error');
errorDiv.innerText = `错误:${message}`;
errorDiv.classList.remove('hidden');
document.getElementById('message').classList.add('hidden');
document.getElementById('addresses').classList.add('hidden');
}
// 绑定表单提交事件
document.getElementById('order-form').addEventListener('submit', function(event) {
event.preventDefault(); // 阻止表单默认提交行为
// 获取表单输入值
const phone = document.getElementById('phone').value.trim();
const email = document.getElementById('email').value.trim();
const address = document.getElementById('address').value.trim();
// 清除之前的消息
document.getElementById('message').classList.add('hidden');
document.getElementById('addresses').classList.add('hidden');
document.getElementById('error').classList.add('hidden');
// 调用createOrder函数
createOrder(phone, email, address);
});
</script>
</body>
</html>

View File

@ -1 +0,0 @@
console.log('Happy developing ✨')

View File

@ -1,10 +0,0 @@
{
"name": "payment_headend",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"private": true
}