制作一个 sol 钱包微信提醒

Sol 平台机器1

开始

使用 cursor 构建 sol 机器人

这里一般可以一步到位,然后通过不断给指令使得代码迭代

我用的 prompt 如下,通过一步步调试得到正确的代码:

请基于以下需求,生成一个用 Python 编写的程序,部署在阿里云函数计算上,定时执行以下任务:

  1. **获取 Solana 钱包余额**:

    • 使用 Solana RPC API。
    • 输入为用户的钱包地址,返回余额(单位为 SOL)。
  2. **获取 SOL 当前价格**:

    • 使用 CoinGecko API。
    • 返回 SOL 的实时价格(单位为美元)。
  3. **计算每日收益率**:

    • 对比今日余额和昨日余额,以及 SOL 的价格变化,计算收益率。
    • 收益率公式:`(今日余额 * 今日价格 - 昨日余额 * 昨日价格) / (昨日余额 * 昨日价格) * 100%`。
  4. **汇报功能**:

    • 使用 Server 酱 API 实现微信推送。
  5. **定时触发**:

    • 配置阿里云函数计算的定时触发器,每天早上 9 点运行。
    • 使用 Cron 表达式配置触发器。
  6. **代码要求**:

    • 结构清晰,包含必要的函数和注释。
    • 提供密钥配置位置(如 Solana RPC 节点 URL 和 Server 酱密钥)。
    • 返回汇报内容的完整格式示例:`"当前余额:10 SOL\n 当前价格:25 USD\n 收益率:+3.5%"`。
  7. **部署说明**:

    • 需要支持阿里云函数计算的直接部署

我需要你完成如下功能:
1.找到我钱包中的所有代币
2.计算所有代币中的价格

但是你还是没有计算这些代币的总价值按

程序还是没有获取这些代币的价格

还是获取不到价格
我希望添加如下功能:
1.将获取到的钱包内所有代币放入一个 json 文件,这样就不用每次额外修改代码添加了
2.对 json 文件中所有代币查询价格同时计算钱包总价值
3.添加有效的 api,能够有效查询代币价格

sol 代码用的函数及其平台4

sol 提醒平台代码思路5

得到的代码

# -*- coding: utf-8 -*-

import json
import logging
import os
import requests
from datetime import datetime, timedelta
import time

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('sol_bot.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger()

# 配置参数
SOLANA_RPC_NODES = [
    'https://solana-api.projectserum.com',
    'https://api.mainnet.rpcpool.com',
    'https://solana.public-rpc.com',
    'https://rpc.ankr.com/solana',
    'https://api.mainnet.rpcpool.com'
]
WALLET_ADDRESS = 'your wallet adress'
SERVER_CHAN_KEY = 'sever jiang api'

# 文件路径 (修改为相对路径)
CURRENT_DIR = os.path.dirname(os.path.abspath(__file__))
HISTORY_FILE = os.path.join(CURRENT_DIR, 'sol_history.json')
TOKEN_CONFIG_FILE = os.path.join(CURRENT_DIR, 'token_config.json')

def get_working_rpc():
    """获取可用的 RPC 节点"""
    for rpc in SOLANA_RPC_NODES:
        try:
            payload = {
                "jsonrpc": "2.0",
                "id": 1,
                "method": "getHealth",
            }
            response = requests.post(rpc, json=payload, timeout=5)
            if response.status_code == 200:
                logger.info(f"使用 RPC 节点: {rpc}")
                return rpc
        except Exception as e:
            logger.warning(f"RPC节点不可用 {rpc}: {str(e)}")
    return None

def get_sol_balance(wallet_address):
    """获取 Solana 钱包余额"""
    max_retries = 3
    for attempt in range(max_retries):
        try:
            # 获取可用的 RPC 节点
            rpc_url = get_working_rpc()
            if not rpc_url:
                logger.error("没有可用的 RPC 节点")
                return None

            payload = {
                "jsonrpc": "2.0",
                "id": 1,
                "method": "getBalance",
                "params": [wallet_address]
            }
            response = requests.post(rpc_url, json=payload, timeout=10)
            result = response.json()
        
            if 'result' in result:
                balance = result['result']['value'] / 1e9
                return balance
            else:
                logger.error(f"获取余额失败: {result}")
        except Exception as e:
            logger.error(f"获取余额异常 (尝试 {attempt + 1}/{max_retries}): {str(e)}")
            if attempt < max_retries - 1:
                time.sleep(1)  # 等待1秒后重试
    return None

def get_sol_price():
    """获取 SOL 当前价格(USD)"""
    try:
        url = "https://api.coingecko.com/api/v3/simple/price"
        params = {
            "ids": "solana",
            "vs_currencies": "usd"
        }
        response = requests.get(url, params=params)
        data = response.json()
        return data['solana']['usd']
    except Exception as e:
        logger.error(f"获取价格异常: {str(e)}")
        return None

def load_history():
    """加载历史数据"""
    try:
        if os.path.exists(HISTORY_FILE):
            with open(HISTORY_FILE, 'r') as f:
                return json.load(f)
    except Exception as e:
        logger.error(f"加载历史数据异常: {str(e)}")
    return None

def save_history(balance, price):
    """保存今日数据"""
    try:
        data = {
            'date': datetime.now().strftime('%Y-%m-%d'),
            'balance': balance,
            'price': price
        }
        with open(HISTORY_FILE, 'w') as f:
            json.dump(data, f)
    except Exception as e:
        logger.error(f"保存历史数据异常: {str(e)}")

def calculate_profit_rate(current_balance, current_price, yesterday_balance, yesterday_price):
    """计算收益率"""
    if not all([current_balance, current_price, yesterday_balance, yesterday_price]):
        return None
  
    today_value = current_balance * current_price
    yesterday_value = yesterday_balance * yesterday_price
  
    profit_rate = ((today_value - yesterday_value) / yesterday_value) * 100
    return profit_rate

def send_wechat_notification(message):
    """发送微信通知"""
    try:
        url = f"https://sctapi.ftqq.com/{SERVER_CHAN_KEY}.send"
        params = {
            "title": "Solana钱包日报",
            "desp": message
        }
        response = requests.post(url, data=params)
        if response.status_code == 200:
            logger.info("微信通知发送成功")
        else:
            logger.error(f"微信通知发送失败: {response.text}")
    except Exception as e:
        logger.error(f"发送通知异常: {str(e)}")

def get_usd_cny_rate():
    """获取美元兑人民币汇率"""
    try:
        url = "https://api.exchangerate-api.com/v4/latest/USD"
        response = requests.get(url)
        data = response.json()
        return data['rates']['CNY']
    except Exception as e:
        logger.error(f"获取汇率异常: {str(e)}")
        return 7.2  # 如果API失败,使用一个默认汇率

def get_token_accounts(wallet_address):
    """获取钱包中所有代币账户"""
    max_retries = 3
    for attempt in range(max_retries):
        try:
            # 获取可用的 RPC 节点
            rpc_url = get_working_rpc()
            if not rpc_url:
                logger.error("没有可用的 RPC 节点")
                return []

            payload = {
                "jsonrpc": "2.0",
                "id": 1,
                "method": "getTokenAccountsByOwner",
                "params": [
                    wallet_address,
                    {
                        "programId": "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"
                    },
                    {
                        "encoding": "jsonParsed"
                    }
                ]
            }
        
            logger.info(f"正在请求钱包 {wallet_address} 的代币信息...")
            response = requests.post(rpc_url, json=payload, timeout=10)
        
            if not response.ok:
                logger.error(f"请求失败: {response.status_code}")
                continue
            
            result = response.json()
            if 'error' in result:
                logger.error(f"RPC错误: {result['error']}")
                continue

            token_balances = []
            accounts = result.get('result', {}).get('value', [])
        
            logger.info(f"获取到 {len(accounts)} 个账户记录")
        
            for account in accounts:
                try:
                    parsed = account['account']['data']['parsed']['info']
                    mint = parsed['mint']
                    balance = float(parsed['tokenAmount']['uiAmount'] or 0)
                
                    if balance > 0:  # 只记录有余额的代币
                        token_balances.append({
                            'mint': mint,
                            'balance': balance
                        })
                        logger.info(f"找到代币: {mint}, 余额: {balance}")
                except (KeyError, TypeError, ValueError) as e:
                    logger.error(f"解析代币数据异常: {str(e)}")
                    continue
        
            # 添加原生 SOL 余额
            sol_balance = get_sol_balance(wallet_address)
            if sol_balance:
                token_balances.append({
                    'mint': 'So11111111111111111111111111111111111111112',
                    'balance': sol_balance
                })
                logger.info(f"添加原生SOL余额: {sol_balance}")
        
            logger.info(f"总共找到 {len(token_balances)} 个有效代币账户")
            return token_balances

        except Exception as e:
            logger.error(f"获取代币账户异常 (尝试 {attempt + 1}/{max_retries}): {str(e)}")
            if attempt < max_retries - 1:
                time.sleep(1)  # 等待1秒后重试
  
    return []

def get_token_name(mint):
    """获取代币名称"""
    TOKEN_NAMES = {
        'So11111111111111111111111111111111111111112': 'SOL',
        'EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v': 'USDC',
        '7zA7992Wyu3EFg2sBGBkvcyzNw5kekk5xdQ7BC7hpump': 'PUMP',
        'MEW1gQWJ3nEXg2qgERiKu7FAFj79PHvQVREQUzScPP5': 'MEW',
        '6Nc4x2cCarACkQznheFxwGM4yRxiCsFSc4QHpaP1pump': 'PUMP1',
        '4Pps3jzbu9avH9rn4bZyMVmTWYySqCYzAXCpC1bLpump': 'PUMP2',
        '4jrjM8GVXW2QnKBVdaogj7zfjre1JBcwNC2BEKp2pump': 'PUMP3',
        'E2VZsWdGniyT7Hnc9HVsSTBs41gGUthbZTvg9YHVpump': 'PUMP4',
        'CTg3ZgYx79zrE1MteDVkmkcGniiFrK1hJ6yiabropump': 'PUMP5',
        'Bw5K8eZaf361uDLHgX2UUn1PNfC7XtgQVvY9sSappump': 'PUMP6',
        'a6typipgpjG1jZzFCe2Wp83m7KUX7e9cc7RebMCpump': 'PUMP7',
        'A5ck2iW8LanfYSTgn9iea9wPKJGnQ2ypqeB4JMWpump': 'PUMP8',
        'NnH578YYaP9RKLvi9phBq6wLjXcevYGgQLkRDo6pump': 'PUMP9',
        '5voS9evDjxF589WuEub5i4ti7FWQmZUUGq5tiJxcqj9': 'BONK',
        'CAPYD6Lrm7bTZ6C7t7JvSxvpEcfKQ9YNB7kUjh6p6XBN': 'CAPY',
        'FkBF9u1upwEMUPxnXjcydxxVSxgr8f3k1YXbz7G7bmtA': 'FOXY',
        '89XyhKVhY19Q1UPPVw4hh7DFEqQ3GGAvWWoGScaZdmfJ': 'MYRO'
    }
    return TOKEN_NAMES.get(mint, f"Unknown({mint[:6]}...)")

def get_token_info_from_birdeye(token_mint):
    """从 Birdeye 获取代币信息"""
    try:
        # 使用 Birdeye 的 token metadata API
        url = f"https://public-api.birdeye.so/public/token_metadata?address={token_mint}"
        headers = {
            "X-API-KEY": "c7e99d65e0ad4c0e8e3bf81be1397fcf"
        }
    
        # 添加重试机制
        max_retries = 3
        for attempt in range(max_retries):
            try:
                response = requests.get(url, headers=headers, timeout=10)
                if response.status_code == 200:
                    data = response.json()
                    if data.get('success') and data.get('data'):
                        token_data = data['data']
                        symbol = token_data.get('symbol', '')
                        name = token_data.get('name', '')
                    
                        # 记录获取到的信息
                        logger.info(f"获取代币信息 - Mint: {token_mint}")
                        logger.info(f"Symbol: {symbol}, Name: {name}")
                    
                        return {
                            'symbol': symbol,
                            'name': name
                        }
                break
            except requests.exceptions.RequestException as e:
                if attempt == max_retries - 1:
                    logger.error(f"获取代币信息失败 {token_mint}: {str(e)}")
                time.sleep(1)  # 失败后等待1秒再重试
            
        # 如果API调用失败,尝试使用预定义的映射
        KNOWN_TOKENS = {
            '3WPep4ufaToK1aS5s8BL9inzeUrt4DYaQCiic6ZkkC1U': {'symbol': 'GIKO', 'name': 'GIKO'},
            'GxWyh17McYtY3h9m75nyTdy8Cp6WAzkdyNRDPKmjHU4c': {'symbol': 'BAPE', 'name': 'Based Ape'},
            'MEW1gQWJ3nEXg2qgERiKu7FAFj79PHvQVREQUzScPP5': {'symbol': 'MEW', 'name': 'MEW'},
            'Bw5K8eZaf361uDLHgX2UUn1PNfC7XtgQVvY9sSappump': {'symbol': 'SAPP', 'name': 'SAPP'},
            'J4ywFdm8H7hjwKzCaEQujhkDRfCnRviVnHMvFNDAoLNQ': {'symbol': 'KING', 'name': 'KING'},
            '6ogzHhzdrQr9Pgv6hZ2MNze7UrzBMAFyBBWUYp1Fhitx': {'symbol': 'POPCAT', 'name': 'POPCAT'},
            # 添加更多已知代币...
        }
    
        if token_mint in KNOWN_TOKENS:
            logger.info(f"使用预定义映射 - Mint: {token_mint}, Symbol: {KNOWN_TOKENS[token_mint]['symbol']}")
            return KNOWN_TOKENS[token_mint]
        
    except Exception as e:
        logger.error(f"获取代币信息异常 {token_mint}: {str(e)}")
    return None

def update_token_config(new_tokens):
    """更新代币配置文件"""
    config_file = 'token_config.json'
    try:
        # 读取现有配置
        if os.path.exists(config_file):
            with open(config_file, 'r', encoding='utf-8') as f:
                config = json.load(f)
        else:
            config = {"tokens": {}}

        # 添加新代币
        updated = False
        for token in new_tokens:
            mint = token['mint']
            if mint not in config['tokens']:
                # 获取代币信息
                token_info = get_token_info_from_birdeye(mint)
            
                if token_info and token_info['symbol']:
                    symbol = token_info['symbol']
                    logger.info(f"成功获取代币信息: {mint} -> {symbol}")
                else:
                    symbol = f"Unknown({mint[:6]}...)"
                    logger.warning(f"无法获取代币信息,使用默认名称: {mint} -> {symbol}")
            
                config['tokens'][mint] = {
                    "symbol": symbol,
                    "coingecko_id": "",
                    "birdeye_id": mint
                }
                updated = True
                logger.info(f"添加新代币到配置: {mint} ({symbol})")
            else:
                # 检查是否需要更新现有代币的信息
                current_symbol = config['tokens'][mint]['symbol']
                if current_symbol.startswith('Unknown'):
                    token_info = get_token_info_from_birdeye(mint)
                    if token_info and token_info['symbol']:
                        config['tokens'][mint]['symbol'] = token_info['symbol']
                        updated = True
                        logger.info(f"更新代币信息: {mint} -> {token_info['symbol']}")

        # 保存更新后的配置
        if updated:
            with open(config_file, 'w', encoding='utf-8') as f:
                json.dump(config, f, indent=4, ensure_ascii=False)
            logger.info("代币配置已更新")

        return config
    except Exception as e:
        logger.error(f"更新代币配置异常: {str(e)}")
        return {"tokens": {}}

def get_birdeye_prices(token_mints):
    """从 Birdeye 批量获取代币价格"""
    try:
        url = "https://public-api.birdeye.so/public/multi_price"
        headers = {
            "X-API-KEY": "c7e99d65e0ad4c0e8e3bf81be1397fcf",  # 这是一个免费的API key
            "Content-Type": "application/json"
        }
    
        prices = {}
        # 将代币列表分块处理,每次最多100个
        chunk_size = 100
        for i in range(0, len(token_mints), chunk_size):
            chunk = token_mints[i:i + chunk_size]
            params = {
                "list_address": chunk
            }
        
            response = requests.post(url, headers=headers, json=params)
            if response.status_code == 200:
                data = response.json()
                if data.get('success') and data.get('data'):
                    for mint, price_data in data['data'].items():
                        if price_data.get('value'):
                            price = float(price_data['value'])
                            prices[mint] = price
                            logger.info(f"获取到价格 {get_token_name(mint)}: ${price:.6f}")
    
        return prices
    except Exception as e:
        logger.error(f"获取Birdeye价格异常: {str(e)}")
        return {}

def get_dexscreener_price(token_mint):
    """从 DexScreener 获取代币价格"""
    try:
        url = f"https://api.dexscreener.com/latest/dex/tokens/{token_mint}"
        response = requests.get(url)
        data = response.json()
    
        if data.get('pairs') and len(data['pairs']) > 0:
            # 使用第一个交易对的价格
            price = float(data['pairs'][0]['priceUsd'])
            logger.info(f"从DexScreener获取到价格 {token_mint}: ${price:.6f}")
            return price
        return 0
    except Exception as e:
        logger.error(f"获取DexScreener价格异常: {str(e)}")
        return 0

def main():
    """主函数"""
    try:
        logger.info("开始获取钱包信息...")
    
        # 1. 获取所有代币余额
        token_accounts = get_token_accounts(WALLET_ADDRESS)
        if not token_accounts:
            logger.error("未获取到任何代币信息")
            return
        
        logger.info(f"找到 {len(token_accounts)} 个代币账户")
    
        # 2. 更新代币配置
        config = update_token_config(token_accounts)
    
        # 3. 获取SOL原生余额
        sol_balance = get_sol_balance(WALLET_ADDRESS)
        if sol_balance:
            token_accounts.append({
                'mint': 'So11111111111111111111111111111111111111112',
                'balance': sol_balance
            })
    
        # 4. 获取所有代币价格
        token_mints = [token['mint'] for token in token_accounts]
        token_prices = get_birdeye_prices(token_mints)
    
        # 5. 使用DexScreener补充缺失的价格
        for mint in token_mints:
            if mint not in token_prices or token_prices[mint] == 0:
                price = get_dexscreener_price(mint)
                if price > 0:
                    token_prices[mint] = price
    
        # 6. 生成报告
        usd_cny_rate = get_usd_cny_rate()
        total_usd_value = 0
        token_values = []
    
        for token in token_accounts:
            mint = token['mint']
            balance = token['balance']
            price = token_prices.get(mint, 0)
            usd_value = balance * price
            cny_value = usd_value * usd_cny_rate
        
            symbol = config['tokens'].get(mint, {}).get('symbol', f"Unknown({mint[:6]}...)")
            token_values.append({
                'symbol': symbol,
                'balance': balance,
                'price': price,
                'usd_value': usd_value,
                'cny_value': cny_value
            })
            total_usd_value += usd_value
    
        # 按价值排序
        token_values.sort(key=lambda x: x['usd_value'], reverse=True)
    
        # 7. 生成报告内容
        report = "💰 Solana钱包资产报告 💰\n\n"
        report += f"总资产: ${total_usd_value:.2f} / ¥{total_usd_value * usd_cny_rate:.2f}\n\n"
        report += "代币明细:\n"
    
        for token in token_values:
            if token['price'] > 0:  # 只显示有价格的代币
                report += f"----------------------------\n"
                report += f"🪙 {token['symbol']}:\n"
                report += f"数量: {token['balance']:.4f}\n"
                report += f"价格: ${token['price']:.6f}\n"
                report += f"价值: ${token['usd_value']:.2f} / ¥{token['cny_value']:.2f}\n"
                report += f"占比: {(token['usd_value']/total_usd_value*100):.2f}%\n"
    
        # 添加无价格数据的代币统计
        no_price_tokens = [token for token in token_values if token['price'] == 0]
        if no_price_tokens:
            report += f"\n❓ 未能获取价格的代币 ({len(no_price_tokens)}个):\n"
            for token in no_price_tokens:
                report += f"{token['symbol']}: {token['balance']:.4f}\n"
    
        # 8. 发送通知
        logger.info("准备发送报告...")
        send_wechat_notification(report)
        logger.info("报告发送完成")
    
    except Exception as e:
        logger.error(f"执行异常: {str(e)}")
        raise

if __name__ == "__main__":
    try:
        logger.info("开始执行...")
        main()
        logger.info("执行完成")
    except KeyboardInterrupt:
        logger.info("程序已停止")
    except Exception as e:
        logger.error(f"程序异常: {str(e)}")

代码部署

注册

代码部署使用 aws lambda

  • 首先注册 aws

    • aws 注册需要的东西

      • 邮箱

      • 信用卡或者借记卡

        • 可以上淘宝买
        • 这里填完了信用卡以后,用自己手机接收一个消息就可以注册账号了

创建函数

image

进入 AWS lambda

image

image

创建函数之后,需要将代码文件放入 lambda_functions 中,但是,代码文件中可能会缺少需要的 package,这时候需要通过层上传代码包6,我认为层相当于补丁.

输入函数

通过编写代码创建函数

image

通过传输 zip 包创建代码

image

  • 需要注意的是 zip 包需要包含 lambda_function.py 函数,也就是主函数,即需要运行的代码

将层添加进函数中

image

image

添加触发器

image

image

image

image

这里询问 ai 即可搞定

设置超时时间

代码运行超过 3s 时,需要提高超时时间配置

image

image

结束

这些内容全程通过 AI 进行辅助,通过 cursor 的辅助,这些功能可以快速生成,可以想到的是,随着 ai 的发展,社会中,技术变得越来越廉价,想法变得越来越重要,有创新的想法,可以很快实施.


  1. Sol 平台机器

    控制面板 _ Lambda1

  2. 控制面板 _ Lambda

    原文地址 https://eu-north-1.console.aws.amazon.com/lambda/home?region=eu-north-1#/discover

    We could not load the content for the page. This might be due to your firewall or proxy server blocking certain domains used to load this page content.

    Refresh the page, and if you continue to experience this error, contact AWS Support.

  3. 🔥 永久免费使用 cursor 教程,白嫖到底! - Kelen

    原文地址 https://www.kelen.cc/posts/always-free-cursor

    Cursor 是一款集成 AI 技术的代码编辑器,由 Anysphere 实验室开发,基于 VSCode 深度定制。支持多种编程语言,并内置了 GPT-4 等 AI 模型,提供智能代码补全、代码生成、代码编辑和聊天功能。

    个人使用上 Cursor 的代码补全功能并非 VSCode 的 AI 编码插件能比的,如果想要了解更多免费的 VSCode 编码插件,可以查阅这篇文章 几款 AI 编程助手,大幅提高你的开发体验

    通过官网可以得知,Cursor 提供以下几种订阅模式:

    • Hobby 计划:免费,包含两周的 Pro 试用期,每月 2000 个代码补全等
    • Pro 计划:每月 20 美元,提供无限制的代码补全和高级请求。
    • Business 计划:每月 40 美元,提供额外的数据保留等服务。

    对于我这种穷比来说每个月 20 美元肯定是付不起,废话不多说,接下来分享两种免费白嫖方案。

    第一种方案(勥推荐)

    当我们的账号使用完 Cursor 免费版本额度或者时间到期了之后,我们可以再次进入官网登录账号,点击 Advanced​,点击 Delete Account​ 删除,输入 delete 点击确认 ,然后用原来的账号再次注册一遍,神奇的事情发生了,官方又会当做新的账号给你分配新的免费额度。嘿嘿~🤤🤤🤤

    第二种方案

    由于注册账号只需要邮箱即可,我们只要邮箱数量足够多,那就可以一直免费使用。可以注册一个 2925 无限邮箱,就可以拥有无限别名邮箱。🌝🌝

    比如你注册的邮箱账号是 kelencc@2925.com​。那么账号注册完成之后,在前缀的 kelencc​ 后面增加任何字符的邮箱地址的邮件,kelencc123@2925.com​,kelencc321@2925.com​ 都会自动被这个工具接收。

    接下来的操作就不用我说了吧,这种方法唯一缺点就是编辑器的插件不能远程同步。 不过不影响,毕竟本地用,插件还在,用完即弃。

    免费限制解决

    现在 cursor 限制了同一台机子使用多次免费试用账号,如果使用上述的方法提示错误:Too many free trial accounts used on this machine,可以尝试使用下面两种方法。

    删除 machineid

    在 Macos 上,尝试删除 ~/Library/Application Support/Cursor/machineid​ 这个文件,重新启动或者重新安装,其他操作系统也有类似的文件,可以自行查找。

    如果不能解决,使用下面第二种方法。👇

    Cursor Fake Machine

    通过大神写的插件 Cursor Fake Machine 来生成一个 machineid ,步骤如下:

    • 下载插件,点击地址 https://www.cursorfake.com/blog/how-to-use-zh ,然后手动安装插件,具体安装方法我相信你会的。
    • 打开命令面板,快捷键是 Ctrl + Shift + P​ ,输入 fake​ 即可。
    • 重启或者重装 Cursor 即可。

    终极解决方案

    可以参考这篇文章,解决 Cursor 免费试用限制问题

    总结

    Cursor 开启了强大的 AI 编程新体验,我愿意称之为史上最强,大家赶紧用起来,不过别太依赖于代码自动生成,用久了编码技术都落后了,避免被人工智能取代了!

  4. sol 代码用的函数及其平台

    1. Solana RPC API
    def get_sol_balance(wallet_address):
        """获取 Solana 钱包的 SOL 余额"""
        payload = {
            "jsonrpc": "2.0",
            "id": 1,
            "method": "getBalance",
            "params": [wallet_address]
        }
        response = requests.post(SOLANA_RPC_URL, json=payload)
    
    1. Birdeye API (用于获取代币价格)
    def get_birdeye_prices(token_mints):
        """从 Birdeye 批量获取代币价格"""
        url = "https://public-api.birdeye.so/public/multi_price"
        headers = {
            "X-API-KEY": "your_api_key",
            "Content-Type": "application/json"
        }
    
    1. DexScreener API (作为备用价格源)
    def get_dexscreener_price(token_mint):
        """从 DexScreener 获取代币价格"""
        url = f"https://api.dexscreener.com/latest/dex/tokens/{token_mint}"
    
    1. ExchangeRate API (获取美元兑人民币汇率)
    def get_usd_cny_rate():
        """获取美元兑人民币汇率"""
        url = "https://api.exchangerate-api.com/v4/latest/USD"
    
    1. Server 酱 API (发送微信通知)
    def send_wechat_notification(message):
        """发送微信通知"""
        url = f"https://sctapi.ftqq.com/{SERVER_CHAN_KEY}.send"
    

    整体代码流程:

    1. 获取钱包中的 SOL 余额
    2. 获取钱包中所有代币账户信息
    3. 从 Birdeye 获取代币价格
    4. 如果 Birdeye 没有价格,从 DexScreener 获取备用价格
    5. 获取美元兑人民币汇率
    6. 计算总资产价值
    7. 生成资产报告
    8. 通过 Server 酱 发送微信通知

    主要功能:

    def lambda_handler(event, context):
        try:
            # 1. 获取代币余额
            token_accounts = get_token_accounts(WALLET_ADDRESS)
            
            # 2. 获取 SOL 余额
            sol_balance = get_sol_balance(WALLET_ADDRESS)
            
            # 3. 获取代币价格
            token_prices = get_birdeye_prices(token_mints)
            
            # 4. 补充 DexScreener 价格
            for mint in token_mints:
                if mint not in token_prices:
                    price = get_dexscreener_price(mint)
                    
            # 5. 获取汇率
            usd_cny_rate = get_usd_cny_rate()
            
            # 6. 计算总价值并生成报告
            # ...
            
            # 7. 发送通知
            send_wechat_notification(report)
    

    这个代码的主要目的是:

    • 监控 Solana 钱包资产
    • 计算资产总价值(美元和人民币)
    • 生成详细的资产报告
    • 通过微信推送通知

    建议改进:

    1. 添加错误重试机制
    2. 使用私有 RPC 节点提高稳定性
    3. 缓存汇率数据减少 API 调用
    4. 添加更多价格源以提高准确性
    5. 优化通知内容的格式
  5. sol 提醒平台代码思路

    1. 基础配置

    # 配置关键参数
    SOLANA_RPC_URL = 'https://api.mainnet-beta.solana.com'  # Solana 网络接入点
    WALLET_ADDRESS = '你的钱包地址'  # 要监控的钱包
    SERVER_CHAN_KEY = '你的Server酱密钥'  # 微信推送服务的密钥
    

    2. 主要功能模块

    2.1 获取 SOL 余额

    def get_sol_balance(wallet_address):
        """
        通过 Solana RPC 接口获取原生 SOL 代币余额
        返回值单位是 SOL(而不是 lamports)
        """
    

    2.2 获取所有代币余额

    def get_token_accounts(wallet_address):
        """
        获取钱包中所有 SPL 代币的余额
        使用 getTokenAccountsByOwner 方法
        返回一个列表,包含所有代币的铸币地址和余额
        """
    

    2.3 获取代币价格

    def get_birdeye_prices(token_mints):
        """
        从 Birdeye API 批量获取代币价格
        Birdeye 是 Solana 生态中的代币价格聚合器
        """
    
    def get_dexscreener_price(token_mint):
        """
        从 DexScreener 获取单个代币价格
        作为 Birdeye 的备用数据源
        """
    

    2.4 汇率转换

    def get_usd_cny_rate():
        """
        获取最新的美元兑人民币汇率
        用于将美元价值转换为人民币
        """
    

    2.5 消息推送

    def send_wechat_notification(message):
        """
        使用 Server酱 服务推送微信通知
        可以将资产报告实时推送到手机
        """
    

    3. 主流程(Lambda Handler)

    def lambda_handler(event, context):
        """
        主函数流程:
        1. 获取所有代币账户信息
        2. 获取 SOL 余额
        3. 获取所有代币的价格
        4. 计算总资产价值
        5. 生成资产报告
        6. 发送通知
        """
    

    4. 数据处理流程

    1. 收集数据

      • 获取 SOL 余额
      • 获取所有 SPL 代币余额
      • 获取代币价格
      • 获取美元汇率
    2. 计算价值

      • 计算每个代币的美元价值
      • 计算人民币价值
      • 计算总资产价值
    3. 生成报告

      • 总资产概览
      • 各代币详细信息
      • 占比分析

    5. 特点和优势

    1. 多源数据

      • 使用多个价格源确保数据可靠性
      • 支持备用数据源
    2. 错误处理

      • 每个功能都有异常处理
      • 关键操作有重试机制
    3. 灵活配置

      • 可配置的监控地址
      • 可自定义通知内容
    4. 实用功能

      • 自动过滤小额资产
      • 按价值排序显示
      • 支持美元和人民币双币种显示

    6. 使用建议

    1. 建议使用私有 RPC 节点提高稳定性
    2. 定期检查 API 密钥的有效性
    3. 可以添加更多的价格数据源
    4. 考虑添加缓存机制减少 API 调用
    5. 可以根据需要调整通知频率

    这个程序可以帮助用户实时监控他们的 Solana 钱包资产,并通过微信获取资产变动通知。适合用于资产监控、投资跟踪等场景。

  6. lambda 使用缺少的包

    原文地址 https://jumping-code.com/2021/07/28/aws-lambda-python-packages/

    本文紀錄如何在 AWS Lambda 上安裝並使用 Python 第三方套件,步驟包含在本機先建立套件的 zip 檔,以及新增 Layer 到 Lambda 函式上

    Lambda 函式預設無法使用 Python 第三方套件

    我在 AWS Lambda 撰寫好函式(或是使用 $ zip <dest_filename>.zip `<py_file>`.py​ 製作 python zip 檔後上傳函式),函式中有使用到 requests 這個套件,執行 Test 時會顯示 No module named 'requests'​,原因就是 AWS Lambda 預設是沒有 requests 這個套件的,需要另外上傳套件檔,以下是解決方法。

    1. 安裝所需套件至 python 資料夾
    2. 打包 python 套件資料夾為 zip 檔
    3. 建立新 Layer
    4. 將 Layer 新增至函式

    前兩步驟是在本機完成,後兩步驟是在 AWS 上進行,接下來將詳細解說各步驟。

    1. 安裝所需套件至 python 資料夾

    AWS 官方文件說額外的套件必須使用 「python」 這個名稱的資料夾打包,所以要在專案資料夾內建立一個 python 資料夾,並將套件安裝到裡面,詳細 Terminal 指令如下:

    $ mkdir python
    $ cd python
    
    # 安裝單一套件
    $ pip install --target . requests
    
    # 一次安裝多個套件
    $ pip install --target . -r requirements.txt
    
    
    

    2. 打包 python 套件資料夾為 zip 檔

    cd 回到原本專案的資料夾,將剛剛的 python 套件資料夾打包成 zip 檔,我這邊取名為 dependencies.zip,指令如下

    $ cd ..
    $ zip -r dependencies.zip ./python
    
    
    

    打包完成後的資料夾階層

    3. 建立新 Layer

    接下來換 AWS 平台上場!

    進入 AWS Lambda 後,在側欄點選「其他資源」的「Layer」,並點選「建立 Layer」,就會進入到下圖的畫面,請依照圖片中的順序進行設定,上傳檔案為剛剛所建立的 dependencies.zip​ 。

    這邊要注意的是第五步的「相容的執行時間」,一定要選擇原本函式的程式碼所設定的 Python 版本,我的程式是使用 Python 3.8 執行,因此選擇執行時間(Runtime)就必須選 Python 3.8,否則後面要新增 Layer 到函式時會沒有相符的可以選。

    4. 將 Layer 新增至函式

    建立完 Layer 之後從側欄回到「函式」,我的函式名稱是 stock.py​:

    import requests
    
    def main(event, context):
        resp = requests.get('https://histock.tw/stock/public.aspx')
        return resp.ok
    
    
    

    (event, context 是 AWS Lambda 必須要有的固定參數)

    函式的「處理常式」必須設定為 <pyfile_name>.<function_name>​,所以我就要將原本預設的變更為 stock.main​, 如下圖,設定完成後點選「新增 Layer」

    點選後會進入下圖的畫面,「Layer source」選擇「自訂 Layer」,下方會出現下拉式選單,選取剛剛建立的 Layer 與版本號,我的因為測試時也是用同個名稱 requests,所以最新版是版本 5 了。

    點擊新增後,就可以回到函式,會發現最底下的 Layer 已經出現剛剛所新增的 requests 了!

    最後再次點 Test,就會發現函式程式碼順利執行,沒有再出現 No module named 'requests'​的問題了!

    延伸閱讀

    學會 AWS Lambda 的操作之後,可以來看看怎麼把 FastAPI 部署到 AWS 的 API Gateway 上囉!

    參考資料

    歡迎追蹤我的 Facebook 粉專:https://www.facebook.com/jumpingcoder

  • Python

    Python 是一种面向对象、直译式电脑编程语言,具有近二十年的发展历史,成熟且稳定。它包含了一组完善而且容易理解的标准库,能够轻松完成很多常见的任务。它的语法简捷和清晰,尽量使用无异义的英语单词,与其它大多数程序设计语言使用大括号不一样,它使用缩进来定义语句块。

    546 引用 • 672 回帖 • 2 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...