双色球组合推荐脚本

氧化钙的菜🐔园地 每天进步亿点点,他日定可出任CEO,迎娶白富美,走上人生巅峰! 本文由博客端 http://www.kidcaoblog.com 主动推送

双色球脚本作用

我观察了几期,发现蓝区号码经常是“15”、“05”。所以就想着能不能统计号码抽中次数,然后作为一组号码去购买双色球,这样总比随机来的强一点。

本脚本用 python 语言实现

项目组织

数据库

创建一个叫“double_color_ball”的数据库

再创建三张表

# 创建 open_number_count 用于存放号码被抽中的次数
CREATE TABLE `open_number_count` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `open_number` varchar(2) NOT NULL COMMENT '开奖号码',
  `red_ball_times` int(11) DEFAULT NULL COMMENT '号码出现在蓝球区的次数',
  `blue_ball_times` int(11) NOT NULL COMMENT '号码出现在蓝球区的次数',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=199 DEFAULT CHARSET=utf8

# 创建 recommend_records 用于存放推荐的组合
CREATE TABLE `recommend_records` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `open_date` varchar(20) DEFAULT NULL COMMENT '开奖日期',
  `open_nums` varchar(20) NOT NULL COMMENT '开奖期号',
  `recommend_nums` varchar(200) NOT NULL COMMENT '推荐组合,格式[(1,2,3,4,5,6),7],[(2,3,4,5,6,7),8]',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8

# 创建 open_records,用于存放往期开奖结果
CREATE TABLE `open_records` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `open_date` varchar(20) NOT NULL COMMENT '开奖日期',
  `open_nums` varchar(20) NOT NULL COMMENT '开奖期号',
  `red_numbers` varchar(100) NOT NULL COMMENT '开奖的红区数字,通过空格分隔',
  `blue_number` varchar(2) NOT NULL COMMENT '开奖的蓝区数字',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=34 DEFAULT CHARSET=utf8

脚本结构

image.png

base:
  base_function: 类中包含初始化数据库、查找开奖号码、计算推荐组合号码等函数
  log_function: 日志文件类
  mysql_functions:类中包含 mysql 数据库增删改查函数
  requests_functions:类中包含获取开奖号码、调用 “server酱“ 通知等函数

config:
  base_config:目前只用于存放 server 酱的 token
  database_config:用于存放连接 mysql 数据库的基础信息

main.py:程序主入口
main.shell:执行shell,到时候配合 crontab 一起使用,定时推荐组合号码
pipfile:用于管理所用到的库

流程

初始化数据库

首先需要知道过往的开奖号码,找到一个 api,输入开奖期号就会返回指定期号到目前最新期号的开奖信息

api 连接

需要再默认拼接上期号,例如“21041”。

获取到的开奖号码存放到 “open_records”中,同时每找到一个红蓝区号码,都需要到 “open_number_count” 表中更新抽中次数。

在每次推荐组合号码之前,都会更新最新的开奖信息到数据库再推荐。

推荐组合号码

目前推荐的组合有两种:

  1. 根据过往开奖,统计红区最常抽中的 6 个号码,蓝区 1 个,作为一组推荐。
  2. 随机生成 6 个红区号码,1 个蓝区号码。

后续可能会加入因子影响随机生成的组合,形成第三种推荐,但是没有很好的头绪,欢迎大家留言交流意见。

开始编码

logs_functions.py

class LogFunctions(object):
    """
    初始化 log 文件
    """

    def __init__(self, logger_name):
        logger = logging.getLogger(logger_name)
        fmter = logging.Formatter(
            "%(asctime)s %(filename)s %(funcName)s [line:%(lineno)d]%(levelname)s %(message)s")

        log_file_name = os.path.join("./logs", "{}.txt".format(logger_name))
        hdlr = logging.FileHandler(log_file_name, mode='a')
        hdlr.setLevel(logging.INFO)
        hdlr.setFormatter(fmter)
        logger.addHandler(hdlr)
        logger.setLevel(logging.INFO)

    @classmethod
    def get_logger(cls, logger_name):
        return logging.getLogger(logger_name)

日志文件目前是存放在项目下的“logs”文件夹中,如果有需要可以自行修改“log_file_name”变量

具体使用方法:

# 生成对象的同时也会自动创建文件
LogFunctions("mysql_functions_logs")

# 获取 mysql_functions_logs 文件的对象
self.logger = LogFunctions.get_logger("mysql_functions_logs")

mysql_functions.py

这个类实现了 mysql 的增删改查功能.

需要注意的点:

  1. select 函数中,因为实现了可以自定义返回多少行数据,所以实际的结果中有两个 tuple 嵌套在一起的,大概是这样的:((0,)),实际 0 才是我们需要的数据,在使用时需要注意这一点。
  2. 关于 “mysql_init_database_object”中的 SSH 登陆,需要注意由于端口转发,数据库实际对外端口不再是 3306 了,需要连接上服务器后使用转发的端口
    port=server.local_bind_port
#!/usr/bin/env python3
# -*- coding:utf-8 -*-

from sshtunnel import SSHTunnelForwarder
from pymysql import connect, DatabaseError, cursors, ProgrammingError, Error, InterfaceError
from base.log_functions import LogFunctions
import sys


class MysqlFunctions(object):
    def __init__(self, login_way, database_config):
        """
        :param login_way: 数据库登录方式,目前实现密码登录(password),ssh登录(ssh)
        :param database_config: 数据库登录的配置
        """

        LogFunctions("mysql_functions_logs")
        self.logger = LogFunctions.get_logger("mysql_functions_logs")

        self._db = None
        self._cursor = None

        self.mysql_init_database_object(login_way, database_config)

    def mysql_init_database_object(self, login_way, database_config):
        """
        初始化数据库的链接,返回数据库对象
        :param login_way: 数据库登录方式,目前实现密码登录(password),ssh登录(ssh)
        :param database_config: 数据库登录的配置
        :return: 返回数据库对象
        """
        if login_way.lower() == "password":
            self._db = connect(host=database_config.get("db_host"), user=database_config.get("db_user"), password=database_config.get("db_passwd"),
                               database=database_config.get("db_name"), port=database_config.get("db_port"))
            self._cursor = self._db.cursor()

        elif login_way.upper() == "SSH":
            server = SSHTunnelForwarder(
                # (database_config.get("ssh_host"), database_config.get("ssh_port")),
                ssh_address_or_host=database_config.get("ssh_host"),
                ssh_port=database_config.get("ssh_port"),
                ssh_password=database_config.get("ssh_password"),
                ssh_pkey=database_config.get("keyfile"),
                ssh_username=database_config.get("ssh_user"),
                remote_bind_address=(database_config.get("db_host"), database_config.get("db_port"))
            )
            server.start()

            self._db = connect(
                host='127.0.0.1',
                port=server.local_bind_port,
                user=database_config.get("db_user"),
                passwd=database_config.get("db_passwd"),
                database=database_config.get("db_name"),
                charset="utf8",
                cursorclass=cursors.DictCursor)

            self._cursor = self._db.cursor()
        else:
            self.logger.error("The login method is not implemented, the program will be exited")
            sys.exit()

    def mysql_get_current_database_name(self):
        """
        获取当前指定的数据库名称
        :return: 数据库名称
        """
        return self.mysql_execute_sql("select database()")

    def mysql_close_cursor_connection(self):
        """
        关闭 cursor 的连接
        :return: 无
        """
        self._cursor.close()
        self.logger.info("mysql cursor close connection")

    def mysql_close_mysql_connection(self):
        """
        关闭数据库连接

        注意:当 socket 关闭后,执行什么方法都是引起 error,慎用!
        :return: 无
        """
        try:
            self._db.close()
            self.logger.info("close mysql socket connection")
        except Error as e:
            self.logger.error("mysql connection already closed:{}".format(e))
            sys.exit()

    def mysql_set_database(self, database_name):
        """
        重新指定数据库
        :param database_name: 数据库名称
        :return: 无
        """
        try:
            self._cursor.execute("use {}".format(database_name))
            self._db.commit()
            self.logger.info("set database:{}".format(database_name))
        except ProgrammingError as e:
            self.logger.error("set database fail, database_name error:{}".format(e))
            sys.exit()

    def mysql_execute_sql(self, sql):
        """
        执行非增删查改sal语句
        :param sql: SQL语句
        :return: 返回执行结果
        """
        self.logger.info("execute other sql:{}".format(sql))
        try:
            self._cursor.execute(sql)
            data = self._cursor.fetchall()
            self._db.commit()
            self.logger.info("execute other sql success!")
            self.logger.info("execute result:{}".format(data))
            return data
        except DatabaseError as e:
            self._db.rollback()
            self.logger.error("execute other sql error:" + str(e))
        except InterfaceError as e:
            self.logger.error("mysql socket maybe close, error information:{}".format(e))

    def mysql_insert_data(self, table_name, values, fields=None):
        """
        插入数据
        :param table_name: 表名,不能为空
        :param values: 值tuple,不允许为空
        :param fields: 字段tuple,可以为空,默认插入全部数据
        :return: 无返回
        """
        # 判断 values 是否为空
        if not values:
            self.logger.error("insert_data(), values is empty")
            return

        if fields:
            insert_sql = "insert into {}{}".format(table_name, str(fields)).replace("'", "")
            insert_sql = insert_sql + "values{}".format(str(values))
        else:
            insert_sql = "insert into {} values{};".format(table_name, str(values))

        self.logger.info("insert into sql:" + insert_sql)
        try:
            self._cursor.execute(insert_sql)
            self._db.commit()
            self.logger.info("insert into success!")
        except DatabaseError as e:
            self._db.rollback()
            self.logger.error("insert into error:" + str(e))

    def mysql_update_data(self, table_name, field, value, where_condition=None):
        """
        更新单条数据
        :param table_name: 表名,不能为空
        :param field: 更新字段,需要传入string
        :param value: 更新的值,需要传入string
        :param where_condition: 匹配条件,目前仅支持 where 匹配
        :return: 无返回
        """

        if where_condition is None:
            update_sql = "update {} set {} = {};".format(table_name, field, value)
        else:
            update_sql = "update {} set {} = {} where {};".format(table_name, field, value, where_condition)

        self.logger.info("update sql: {}".format(update_sql))

        try:
            self._cursor.execute(update_sql)
            self._db.commit()
            self.logger.info("update sql success!")
        except DatabaseError as e:
            self._db.rollback()
            self.logger.error("update error:" + str(e))

    def mysql_select_data(self, table_name, return_num=1, want_get_info="*", where_condition=None,
                          having_condition=None,
                          order_condition=None, limit_condition=None):
        """
        查询数据,返回数据信息
        :param table_name: 表名,不允许为空,需要传入一个string
        :param return_num: 返回数据多少行,默认一条
        :param want_get_info: 想获取的信息,默认为全部"*",需要传入一个tuple
        :param where_condition: where 匹配条件,默认为空,需要传入一个string
        :param having_condition: having 匹配条件,默认为空,需要传入一个string
        :param order_condition: order 匹配条件,默认为空,需要传入一个string
        :param limit_condition: limit 匹配条件,默认为空,需要传入一个string
        :return: 返回结果
        """
        select_sql = "select {} from {}".format(str(want_get_info), table_name)
        if where_condition:
            select_sql = select_sql + " where {}".format(str(where_condition))

        if having_condition:
            select_sql = select_sql + " having {}".format(str(having_condition))

        if order_condition:
            select_sql = select_sql + " order by {}".format(str(order_condition))

        if limit_condition:
            select_sql = select_sql + " limit {}".format(str(limit_condition))

        select_sql = select_sql + ";"

        try:
            # 执行查询
            self._cursor.execute(select_sql)

            # 返回指定数据
            return self._cursor.fetchmany(int(return_num))
        except DatabaseError as e:
            self._db.rollback()
            self.logger.error("select error: {}".format(str(e)))

requests_functions.py

由于当初敲代码没有考虑的很周全,所以把查询开奖信息的功能写到“base_functions”类中,应该放到 request_functions 类中的。

这个类实现了调用 server 酱的服务,可以给「企业微信、微信、钉钉」等发消息,这里贴个连接,有兴趣可以了解一下,很简单的。server 酱

#!/usr/bin/env python3
# -*- coding:utf-8 -*-

import requests
from config.base_config import STF_PUSH_TOKEN


class RequestsFunctions(object):

    @classmethod
    def messages_push(cls, message_title, message_text):
        """
        使用 server 酱发送推荐组合通知,加上定时任务就不用手动运行了。

        默认从 base_config 获取 push_token 有多少个就发多少次
        :param message_title: 消息的通知
        :param message_text: 消息的内容
        :return: 无
        """
        for x in STF_PUSH_TOKEN:
            url = "https://sctapi.ftqq.com/{}.send".format(x)

            push_data = {
                "title": message_title,
                "desp": message_text
            }

            requests.post(url=url, data=push_data)

base_functions.py

这里有几点需要注意的:

  1. 在抓取网页的数据时,推荐使用“bs4.BeautifulSoup”,非常简单易用。由于返回的数据是字符串,所以为了方便直接使用了切片的方式分隔数据。
  2. 函数“init_database”由于是在很不情愿的状态下敲的(误操作把数据库重装了。。。大家一定要养成定时备份的习惯!),所以可能有坑,大家注意一下
  3. 如果在开奖当天执行,有可能网页还没来得及更新数据(开奖号码有了,但是奖金信息没有不全,导致切片失败),所以建议第二天再执行。
#!/usr/bin/env python3
# -*- coding:utf-8 -*-

from bs4 import BeautifulSoup
from base.mysql_functions import MysqlFunctions
from config.database_config import DOCKER_MYSQL
from base.requests_functions import RequestsFunctions
import requests
import re
import datetime
import random


class BASEFUNCTIONS(object):
    """
    实现基础功能
    """

    def __init__(self):
        self.mysql = MysqlFunctions("password", DOCKER_MYSQL)

    def update_open_records(self):
        """
        更新开奖记录,需要实现自动补全功能
        :return:
        """
        # 先从数据库查出最新的开奖期号
        db_open_nums = self.mysql.mysql_select_data("open_records", want_get_info="open_nums",
                                                    order_condition="id desc")

        # 把编号加1传给insert_data
        self.insert_data(int(db_open_nums[0][0]) + 1)

    def insert_data(self, db_open_nums):
        """
        存数据到数据库中
        :return:
        """
        open_nums = ""
        open_date = ""
        red_nums = ""
        blue_nums = ""

        url = "https://datachart.500.com/ssq/history/newinc/history.php?start=" + str(db_open_nums)

        req = requests.get(url=url)

        # 转码一下
        req.encoding = "utf-8"

        # 找到所有tr标签的数据
        find_tr = BeautifulSoup(req.text, features="html.parser").find_all("tr", class_="t_tr1")

        results = []
        for x in range(len(find_tr)):
            results.append(re.findall(r"\d+", find_tr[x].text))

        for y in range(1, len(find_tr) + 1):
            result = results[-y]
            try:
                open_nums = result[0][:5]
                open_date = result[8][-4:] + "-" + result[9] + "-" + result[10]
            except IndexError as e:
                print("有可能网站页面的奖金没有更新,导致切片失败了,等明天再执行吧")
                exit()

            temp = 5
            for z in range(6):
                # 找到每个号码
                find_number = result[0][temp:temp + 2]
                red_nums = red_nums + find_number + " "

                # 更新这个号码出现的次数
                # red_ball_show_times格式:(0,)
                red_ball_show_times = self.mysql.mysql_select_data(table_name="open_number_count",
                                                                   where_condition="open_number = {}".format(
                                                                       find_number),
                                                                   want_get_info="red_ball_times")[0][0]
                self.mysql.mysql_update_data(table_name="open_number_count", value=red_ball_show_times + 1,
                                             where_condition="open_number = {}".format(find_number),
                                             field="red_ball_times")
                temp += 2

            blue_nums = result[0][-2:]
            blue_ball_show_times = self.mysql.mysql_select_data(table_name="open_number_count",
                                                                where_condition="open_number = {}".format(
                                                                    blue_nums),
                                                                want_get_info="blue_ball_times")[0][0]
            self.mysql.mysql_update_data(table_name="open_number_count", value=blue_ball_show_times + 1,
                                         where_condition="open_number = {}".format(blue_nums),
                                         field="blue_ball_times")

            # 开始插入数据
            records_result = self.mysql.mysql_select_data(table_name="open_records", want_get_info="open_date",
                                                          where_condition='open_date = "{}"'.format(open_date))

            if len(records_result) == 0:
                self.mysql.mysql_insert_data(table_name="open_records",
                                             values=(open_date, open_nums, red_nums, blue_nums),
                                             fields=("open_date", "open_nums", "red_numbers", "blue_number"))

            # 重制下变量
            red_nums = ""
            blue_nums = ""

    def recommend_numbers(self, recommend_count):
        """
        推荐6个红球和1个蓝球号码。

        第一组:根据记录,找到最常出现的6个红球,1个蓝球
        第二~组:完全是随机的。
        :param recommend_count: 需要推荐组的数量
        :return: 返回一个list,每一组推荐都是一个tuple
        """
        # 从 open_number_count 表通过排序方式找到6个红球号码和一个蓝球号码
        recommend_numbers = []
        red_balls = []
        usual_red_balls = self.mysql.mysql_select_data(table_name="open_number_count", want_get_info="open_number",
                                                       order_condition="red_ball_times desc", return_num=6)

        usual_blue_balls = self.mysql.mysql_select_data(table_name="open_number_count", want_get_info="open_number",
                                                        order_condition="blue_ball_times desc", return_num=1)

        for x in usual_red_balls:
            red_balls.append(int(x[0]))

        # 进行排序
        red_balls_sort = self.numbers_sort(red_balls)

        # 把多次出现的推荐组合添加到数组中
        recommend_numbers.append((red_balls_sort, int(usual_blue_balls[0][0])))

        # 把随机推荐的组合添加到数组中
        recommend_numbers.append(self.get_random_recommend_numbers(recommend_count))

        # 把数据插入到数据库
        new_open_info = self.calculation_open_date_and_nums()

        # 为了避免多次执行导致插入多条,这里做个判断,如果数据已存在"open_nums",那么就执行更新,不插入
        is_exists = self.mysql.mysql_select_data(table_name="recommend_records", want_get_info="open_nums",
                                                 where_condition="open_nums = {}".format(new_open_info[1]))

        if len(is_exists) > 0:
            self.mysql.mysql_update_data(table_name="recommend_records", field="recommend_nums",
                                         value=str(recommend_numbers),
                                         where_condition="open_nums = {}".format(new_open_info[1]))
        else:
            self.mysql.mysql_insert_data(table_name="recommend_records",
                                         fields=("open_date", "open_nums", "recommend_nums"),
                                         values=(*new_open_info, str(recommend_numbers)))

        # 发送一下推荐组合
        RequestsFunctions.messages_push(message_title="期号:'{}' 推荐组合".format(new_open_info[1]),
                                        message_text=str(recommend_numbers))

        return recommend_numbers

    def get_random_recommend_numbers(self, recommend_count):
        """
        获取随机推荐的号码
        :param recommend_count: 需要推荐组的数量
        :return: 返回N组推荐号码,是一个list
        """
        # 红区是6位「1~33」,蓝区是1位「1~16」
        random_recommend_list = []

        for x in range(int(recommend_count)):
            red_balls = []

            while True:
                if len(red_balls) == 6:
                    break

                red_number = random.randint(1, 33)

                if red_number in red_balls:
                    continue
                else:
                    red_balls.append(red_number)

            # 使用冒泡排序把数据整理一下,从大到小排列
            self.numbers_sort(red_balls)

            blue_ball = random.randint(1, 16)

            random_recommend_list.append((red_balls, blue_ball))

        return random_recommend_list

    def numbers_sort(self, nums: list):
        """
        给数据排序,使用冒泡排序算法
        :param nums: 数据,需要传入一个list
        :return: 返回排序好的list,从大到小
        """
        for x in range(len(nums)):
            for y in range(0, len(nums) - 1):
                if nums[y] > nums[y + 1]:
                    nums[y], nums[y + 1] = nums[y + 1], nums[y]

        return nums

    def calculation_open_date_and_nums(self):
        """
        计算开奖日期和期号
        :return: 返回一个list,0:开奖日期,1:是开奖期号
        """
        # 由于会先更新数据,再推荐,所以直接取最新的开奖期号和开奖日期
        db_last_open_info = \
            self.mysql.mysql_select_data(table_name="open_records", want_get_info="open_date, open_nums",
                                         order_condition="id desc", return_num=1)[0]

        # 开奖期号+1即可,开奖日期在星期二、四、日,所以需要获取当前时间,然后再确定+2还是+3
        new_nums = str(int(db_last_open_info[1]) + 1)

        # 获取当前日期
        current_weekday = datetime.datetime.now().weekday() + 1

        db_last_open_date = datetime.datetime.strptime(db_last_open_info[0], "%Y-%m-%d").date()

        new_open_date = ""
        if current_weekday in (1, 2):
            new_open_date = db_last_open_date + datetime.timedelta(days=+2)
        elif current_weekday in (3, 4):
            new_open_date = db_last_open_date + datetime.timedelta(days=+2)
        elif current_weekday in (5, 6, 7):
            new_open_date = db_last_open_date + datetime.timedelta(days=+3)

        return [str(new_open_date), new_nums]

    def init_database(self, open_num):
        """
        初始化数据库,插入open_records,open_number_count
        :param open_num: 开奖的期号
        :return:
        """
        # 先给 open_number_count 初始化数据
        for num in range(1, 34):
            self.mysql.mysql_insert_data(table_name="open_number_count",
                                         fields=("open_number", "red_ball_times", "blue_ball_times"),
                                         values=(str(num), 0, 0))

        open_nums = ""
        open_date = ""
        red_nums = ""
        blue_nums = ""

        url = "https://datachart.500.com/ssq/history/newinc/history.php?start=" + str(open_num)

        req = requests.get(url=url)

        # 转码一下
        req.encoding = "utf-8"

        # 找到所有tr标签的数据
        find_tr = BeautifulSoup(req.text, features="html.parser").find_all("tr", class_="t_tr1")

        results = []
        for x in range(len(find_tr)):
            results.append(re.findall(r"\d+", find_tr[x].text))

        for y in range(1, len(find_tr) + 1):
            result = results[-y]
            try:
                open_nums = result[0][:5]
                open_date = result[8][-4:] + "-" + result[9] + "-" + result[10]
            except IndexError as e:
                print("有可能网站页面的奖金没有更新,导致切片失败了,等明天再执行吧")
                exit()

            temp = 5
            for z in range(6):
                # 找到每个号码
                find_number = result[0][temp:temp + 2]
                red_nums = red_nums + find_number + " "

                # 更新这个号码出现的次数
                # red_ball_show_times格式:(0,)
                red_ball_show_times = self.mysql.mysql_select_data(table_name="open_number_count",
                                                                   where_condition="open_number = {}".format(
                                                                       find_number),
                                                                   want_get_info="red_ball_times")[0][0]
                self.mysql.mysql_update_data(table_name="open_number_count", value=red_ball_show_times + 1,
                                             where_condition="open_number = {}".format(find_number),
                                             field="red_ball_times")
                temp += 2

            blue_nums = result[0][-2:]
            blue_ball_show_times = self.mysql.mysql_select_data(table_name="open_number_count",
                                                                where_condition="open_number = {}".format(
                                                                    blue_nums),
                                                                want_get_info="blue_ball_times")[0][0]
            self.mysql.mysql_update_data(table_name="open_number_count", value=blue_ball_show_times + 1,
                                         where_condition="open_number = {}".format(blue_nums),
                                         field="blue_ball_times")

            # 开始插入数据
            records_result = self.mysql.mysql_select_data(table_name="open_records", want_get_info="open_date",
                                                          where_condition='open_date = "{}"'.format(open_date))

            if len(records_result) == 0:
                self.mysql.mysql_insert_data(table_name="open_records",
                                             values=(open_date, open_nums, red_nums, blue_nums),
                                             fields=("open_date", "open_nums", "red_numbers", "blue_number"))
            else:
                self.mysql.mysql_update_data(table_name="open_records",
                                             value=str(blue_nums),
                                             field="blue_number",
                                             where_condition='open_date = "{}"'.format(open_date))

            # 重制下变量
            red_nums = ""
            blue_nums = ""

main.py

不解释了

#!/usr/bin/env python3                         
# -*- coding:utf-8 -*-                         
                                               
from base.base_functions import BASEFUNCTIONS  
import os                                      
                                               
if __name__ == '__main__':                     
    # 删掉logs文件                                 
    logs_path = "./logs/mysql_functions_logs.tx
    if os.path.exists(logs_path):              
        os.remove(logs_path)                   
                                               
    bf = BASEFUNCTIONS()                       
    bf.update_open_records()                   
    print(bf.recommend_numbers(2))

main.shell

这里说两句:

  1. pipenv run 执行时会自动进入虚拟环境,执行后自动退出虚拟环境
  2. 如果把代码上传到代码托管平台,gitlab、gtiee、github 等,切记一定要使用 igitignore 不然会后悔的!!!
  3. pipenv install 类似 "requirements.txt" 但是比他好用多了,推荐使用 pipenv 管理虚拟环境,安装库,管理 python 版本,功能强大,你值得拥有!
#!/bin/bash

cd /root/kidcao/double_color_ball

# 更新脚本
git checkout master
git pull origin master

# 进入虚拟环境且安装库
pipenv run pipenv install

# 进入虚拟环境且执行脚本
pipenv run python3.9 main.py

添加定时任务

centos 的定时任务需要安装 crontab,所以先安装他

yum install crontabs

crontab 常用参数:

使用 crontab -e,然后在最末位添加:0 18 * * 1,3,5 bash /具体路径/double_color_ball/main.shell,即可

  • Python

    Python 是一种面向对象、直译式电脑编程语言,具有近二十年的发展历史,成熟且稳定。它包含了一组完善而且容易理解的标准库,能够轻松完成很多常见的任务。它的语法简捷和清晰,尽量使用无异义的英语单词,与其它大多数程序设计语言使用大括号不一样,它使用缩进来定义语句块。

    501 引用 • 666 回帖 • 254 关注
  • requests
    2 引用 • 3 回帖
  • 双色球
    1 引用 • 1 回帖
  • 脚本
    20 引用 • 42 回帖 • 1 关注

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...
  • kidcao
    作者

    自顶 doge