双色球脚本作用
我观察了几期,发现蓝区号码经常是“15”、“05”。所以就想着能不能统计号码抽中次数,然后作为一组号码去购买双色球,这样总比随机来的强一点。
本脚本用 python 语言实现
项目组织
数据库
创建一个叫“double_color_ball”的数据库
再创建三张表
# 创建 open_number_count, 用于存放号码被抽中的次数
CREATE TABLE `open_number_count` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`open_number` varchar(2) NOT NULL COMMENT '开奖号码',
`red_ball_times` int(11) DEFAULT NULL COMMENT '号码出现在蓝球区的次数',
`blue_ball_times` int(11) NOT NULL COMMENT '号码出现在蓝球区的次数',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=199 DEFAULT CHARSET=utf8
# 创建 recommend_records, 用于存放推荐的组合
CREATE TABLE `recommend_records` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`open_date` varchar(20) DEFAULT NULL COMMENT '开奖日期',
`open_nums` varchar(20) NOT NULL COMMENT '开奖期号',
`recommend_nums` varchar(200) NOT NULL COMMENT '推荐组合,格式[(1,2,3,4,5,6),7],[(2,3,4,5,6,7),8]',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8
# 创建 open_records,用于存放往期开奖结果
CREATE TABLE `open_records` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`open_date` varchar(20) NOT NULL COMMENT '开奖日期',
`open_nums` varchar(20) NOT NULL COMMENT '开奖期号',
`red_numbers` varchar(100) NOT NULL COMMENT '开奖的红区数字,通过空格分隔',
`blue_number` varchar(2) NOT NULL COMMENT '开奖的蓝区数字',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=34 DEFAULT CHARSET=utf8
脚本结构
base:
base_function: 类中包含初始化数据库、查找开奖号码、计算推荐组合号码等函数
log_function: 日志文件类
mysql_functions:类中包含 mysql 数据库增删改查函数
requests_functions:类中包含获取开奖号码、调用 “server酱“ 通知等函数
config:
base_config:目前只用于存放 server 酱的 token
database_config:用于存放连接 mysql 数据库的基础信息
main.py:程序主入口
main.shell:执行shell,到时候配合 crontab 一起使用,定时推荐组合号码
pipfile:用于管理所用到的库
流程
初始化数据库
首先需要知道过往的开奖号码,找到一个 api,输入开奖期号就会返回指定期号到目前最新期号的开奖信息
需要再默认拼接上期号,例如“21041”。
获取到的开奖号码存放到 “open_records”中,同时每找到一个红蓝区号码,都需要到 “open_number_count” 表中更新抽中次数。
在每次推荐组合号码之前,都会更新最新的开奖信息到数据库再推荐。
推荐组合号码
目前推荐的组合有两种:
- 根据过往开奖,统计红区最常抽中的 6 个号码,蓝区 1 个,作为一组推荐。
- 随机生成 6 个红区号码,1 个蓝区号码。
后续可能会加入因子影响随机生成的组合,形成第三种推荐,但是没有很好的头绪,欢迎大家留言交流意见。
开始编码
logs_functions.py
class LogFunctions(object):
"""
初始化 log 文件
"""
def __init__(self, logger_name):
logger = logging.getLogger(logger_name)
fmter = logging.Formatter(
"%(asctime)s %(filename)s %(funcName)s [line:%(lineno)d]%(levelname)s %(message)s")
log_file_name = os.path.join("./logs", "{}.txt".format(logger_name))
hdlr = logging.FileHandler(log_file_name, mode='a')
hdlr.setLevel(logging.INFO)
hdlr.setFormatter(fmter)
logger.addHandler(hdlr)
logger.setLevel(logging.INFO)
@classmethod
def get_logger(cls, logger_name):
return logging.getLogger(logger_name)
日志文件目前是存放在项目下的“logs”文件夹中,如果有需要可以自行修改“log_file_name”变量
具体使用方法:
# 生成对象的同时也会自动创建文件
LogFunctions("mysql_functions_logs")
# 获取 mysql_functions_logs 文件的对象
self.logger = LogFunctions.get_logger("mysql_functions_logs")
mysql_functions.py
这个类实现了 mysql 的增删改查功能.
需要注意的点:
- select 函数中,因为实现了可以自定义返回多少行数据,所以实际的结果中有两个 tuple 嵌套在一起的,大概是这样的:((0,)),实际 0 才是我们需要的数据,在使用时需要注意这一点。
- 关于 “mysql_init_database_object”中的 SSH 登陆,需要注意由于端口转发,数据库实际对外端口不再是 3306 了,需要连接上服务器后使用转发的端口
port=server.local_bind_port
#!/usr/bin/env python3
# -*- coding:utf-8 -*-
from sshtunnel import SSHTunnelForwarder
from pymysql import connect, DatabaseError, cursors, ProgrammingError, Error, InterfaceError
from base.log_functions import LogFunctions
import sys
class MysqlFunctions(object):
def __init__(self, login_way, database_config):
"""
:param login_way: 数据库登录方式,目前实现密码登录(password),ssh登录(ssh)
:param database_config: 数据库登录的配置
"""
LogFunctions("mysql_functions_logs")
self.logger = LogFunctions.get_logger("mysql_functions_logs")
self._db = None
self._cursor = None
self.mysql_init_database_object(login_way, database_config)
def mysql_init_database_object(self, login_way, database_config):
"""
初始化数据库的链接,返回数据库对象
:param login_way: 数据库登录方式,目前实现密码登录(password),ssh登录(ssh)
:param database_config: 数据库登录的配置
:return: 返回数据库对象
"""
if login_way.lower() == "password":
self._db = connect(host=database_config.get("db_host"), user=database_config.get("db_user"), password=database_config.get("db_passwd"),
database=database_config.get("db_name"), port=database_config.get("db_port"))
self._cursor = self._db.cursor()
elif login_way.upper() == "SSH":
server = SSHTunnelForwarder(
# (database_config.get("ssh_host"), database_config.get("ssh_port")),
ssh_address_or_host=database_config.get("ssh_host"),
ssh_port=database_config.get("ssh_port"),
ssh_password=database_config.get("ssh_password"),
ssh_pkey=database_config.get("keyfile"),
ssh_username=database_config.get("ssh_user"),
remote_bind_address=(database_config.get("db_host"), database_config.get("db_port"))
)
server.start()
self._db = connect(
host='127.0.0.1',
port=server.local_bind_port,
user=database_config.get("db_user"),
passwd=database_config.get("db_passwd"),
database=database_config.get("db_name"),
charset="utf8",
cursorclass=cursors.DictCursor)
self._cursor = self._db.cursor()
else:
self.logger.error("The login method is not implemented, the program will be exited")
sys.exit()
def mysql_get_current_database_name(self):
"""
获取当前指定的数据库名称
:return: 数据库名称
"""
return self.mysql_execute_sql("select database()")
def mysql_close_cursor_connection(self):
"""
关闭 cursor 的连接
:return: 无
"""
self._cursor.close()
self.logger.info("mysql cursor close connection")
def mysql_close_mysql_connection(self):
"""
关闭数据库连接
注意:当 socket 关闭后,执行什么方法都是引起 error,慎用!
:return: 无
"""
try:
self._db.close()
self.logger.info("close mysql socket connection")
except Error as e:
self.logger.error("mysql connection already closed:{}".format(e))
sys.exit()
def mysql_set_database(self, database_name):
"""
重新指定数据库
:param database_name: 数据库名称
:return: 无
"""
try:
self._cursor.execute("use {}".format(database_name))
self._db.commit()
self.logger.info("set database:{}".format(database_name))
except ProgrammingError as e:
self.logger.error("set database fail, database_name error:{}".format(e))
sys.exit()
def mysql_execute_sql(self, sql):
"""
执行非增删查改sal语句
:param sql: SQL语句
:return: 返回执行结果
"""
self.logger.info("execute other sql:{}".format(sql))
try:
self._cursor.execute(sql)
data = self._cursor.fetchall()
self._db.commit()
self.logger.info("execute other sql success!")
self.logger.info("execute result:{}".format(data))
return data
except DatabaseError as e:
self._db.rollback()
self.logger.error("execute other sql error:" + str(e))
except InterfaceError as e:
self.logger.error("mysql socket maybe close, error information:{}".format(e))
def mysql_insert_data(self, table_name, values, fields=None):
"""
插入数据
:param table_name: 表名,不能为空
:param values: 值tuple,不允许为空
:param fields: 字段tuple,可以为空,默认插入全部数据
:return: 无返回
"""
# 判断 values 是否为空
if not values:
self.logger.error("insert_data(), values is empty")
return
if fields:
insert_sql = "insert into {}{}".format(table_name, str(fields)).replace("'", "")
insert_sql = insert_sql + "values{}".format(str(values))
else:
insert_sql = "insert into {} values{};".format(table_name, str(values))
self.logger.info("insert into sql:" + insert_sql)
try:
self._cursor.execute(insert_sql)
self._db.commit()
self.logger.info("insert into success!")
except DatabaseError as e:
self._db.rollback()
self.logger.error("insert into error:" + str(e))
def mysql_update_data(self, table_name, field, value, where_condition=None):
"""
更新单条数据
:param table_name: 表名,不能为空
:param field: 更新字段,需要传入string
:param value: 更新的值,需要传入string
:param where_condition: 匹配条件,目前仅支持 where 匹配
:return: 无返回
"""
if where_condition is None:
update_sql = "update {} set {} = {};".format(table_name, field, value)
else:
update_sql = "update {} set {} = {} where {};".format(table_name, field, value, where_condition)
self.logger.info("update sql: {}".format(update_sql))
try:
self._cursor.execute(update_sql)
self._db.commit()
self.logger.info("update sql success!")
except DatabaseError as e:
self._db.rollback()
self.logger.error("update error:" + str(e))
def mysql_select_data(self, table_name, return_num=1, want_get_info="*", where_condition=None,
having_condition=None,
order_condition=None, limit_condition=None):
"""
查询数据,返回数据信息
:param table_name: 表名,不允许为空,需要传入一个string
:param return_num: 返回数据多少行,默认一条
:param want_get_info: 想获取的信息,默认为全部"*",需要传入一个tuple
:param where_condition: where 匹配条件,默认为空,需要传入一个string
:param having_condition: having 匹配条件,默认为空,需要传入一个string
:param order_condition: order 匹配条件,默认为空,需要传入一个string
:param limit_condition: limit 匹配条件,默认为空,需要传入一个string
:return: 返回结果
"""
select_sql = "select {} from {}".format(str(want_get_info), table_name)
if where_condition:
select_sql = select_sql + " where {}".format(str(where_condition))
if having_condition:
select_sql = select_sql + " having {}".format(str(having_condition))
if order_condition:
select_sql = select_sql + " order by {}".format(str(order_condition))
if limit_condition:
select_sql = select_sql + " limit {}".format(str(limit_condition))
select_sql = select_sql + ";"
try:
# 执行查询
self._cursor.execute(select_sql)
# 返回指定数据
return self._cursor.fetchmany(int(return_num))
except DatabaseError as e:
self._db.rollback()
self.logger.error("select error: {}".format(str(e)))
requests_functions.py
由于当初敲代码没有考虑的很周全,所以把查询开奖信息的功能写到“base_functions”类中,应该放到 request_functions 类中的。
这个类实现了调用 server 酱的服务,可以给「企业微信、微信、钉钉」等发消息,这里贴个连接,有兴趣可以了解一下,很简单的。server 酱
#!/usr/bin/env python3
# -*- coding:utf-8 -*-
import requests
from config.base_config import STF_PUSH_TOKEN
class RequestsFunctions(object):
@classmethod
def messages_push(cls, message_title, message_text):
"""
使用 server 酱发送推荐组合通知,加上定时任务就不用手动运行了。
默认从 base_config 获取 push_token 有多少个就发多少次
:param message_title: 消息的通知
:param message_text: 消息的内容
:return: 无
"""
for x in STF_PUSH_TOKEN:
url = "https://sctapi.ftqq.com/{}.send".format(x)
push_data = {
"title": message_title,
"desp": message_text
}
requests.post(url=url, data=push_data)
base_functions.py
这里有几点需要注意的:
- 在抓取网页的数据时,推荐使用“bs4.BeautifulSoup”,非常简单易用。由于返回的数据是字符串,所以为了方便直接使用了切片的方式分隔数据。
- 函数“init_database”由于是在很不情愿的状态下敲的(误操作把数据库重装了。。。大家一定要养成定时备份的习惯!),所以可能有坑,大家注意一下
- 如果在开奖当天执行,有可能网页还没来得及更新数据(开奖号码有了,但是奖金信息没有不全,导致切片失败),所以建议第二天再执行。
#!/usr/bin/env python3
# -*- coding:utf-8 -*-
from bs4 import BeautifulSoup
from base.mysql_functions import MysqlFunctions
from config.database_config import DOCKER_MYSQL
from base.requests_functions import RequestsFunctions
import requests
import re
import datetime
import random
class BASEFUNCTIONS(object):
"""
实现基础功能
"""
def __init__(self):
self.mysql = MysqlFunctions("password", DOCKER_MYSQL)
def update_open_records(self):
"""
更新开奖记录,需要实现自动补全功能
:return:
"""
# 先从数据库查出最新的开奖期号
db_open_nums = self.mysql.mysql_select_data("open_records", want_get_info="open_nums",
order_condition="id desc")
# 把编号加1传给insert_data
self.insert_data(int(db_open_nums[0][0]) + 1)
def insert_data(self, db_open_nums):
"""
存数据到数据库中
:return:
"""
open_nums = ""
open_date = ""
red_nums = ""
blue_nums = ""
url = "https://datachart.500.com/ssq/history/newinc/history.php?start=" + str(db_open_nums)
req = requests.get(url=url)
# 转码一下
req.encoding = "utf-8"
# 找到所有tr标签的数据
find_tr = BeautifulSoup(req.text, features="html.parser").find_all("tr", class_="t_tr1")
results = []
for x in range(len(find_tr)):
results.append(re.findall(r"\d+", find_tr[x].text))
for y in range(1, len(find_tr) + 1):
result = results[-y]
try:
open_nums = result[0][:5]
open_date = result[8][-4:] + "-" + result[9] + "-" + result[10]
except IndexError as e:
print("有可能网站页面的奖金没有更新,导致切片失败了,等明天再执行吧")
exit()
temp = 5
for z in range(6):
# 找到每个号码
find_number = result[0][temp:temp + 2]
red_nums = red_nums + find_number + " "
# 更新这个号码出现的次数
# red_ball_show_times格式:(0,)
red_ball_show_times = self.mysql.mysql_select_data(table_name="open_number_count",
where_condition="open_number = {}".format(
find_number),
want_get_info="red_ball_times")[0][0]
self.mysql.mysql_update_data(table_name="open_number_count", value=red_ball_show_times + 1,
where_condition="open_number = {}".format(find_number),
field="red_ball_times")
temp += 2
blue_nums = result[0][-2:]
blue_ball_show_times = self.mysql.mysql_select_data(table_name="open_number_count",
where_condition="open_number = {}".format(
blue_nums),
want_get_info="blue_ball_times")[0][0]
self.mysql.mysql_update_data(table_name="open_number_count", value=blue_ball_show_times + 1,
where_condition="open_number = {}".format(blue_nums),
field="blue_ball_times")
# 开始插入数据
records_result = self.mysql.mysql_select_data(table_name="open_records", want_get_info="open_date",
where_condition='open_date = "{}"'.format(open_date))
if len(records_result) == 0:
self.mysql.mysql_insert_data(table_name="open_records",
values=(open_date, open_nums, red_nums, blue_nums),
fields=("open_date", "open_nums", "red_numbers", "blue_number"))
# 重制下变量
red_nums = ""
blue_nums = ""
def recommend_numbers(self, recommend_count):
"""
推荐6个红球和1个蓝球号码。
第一组:根据记录,找到最常出现的6个红球,1个蓝球
第二~组:完全是随机的。
:param recommend_count: 需要推荐组的数量
:return: 返回一个list,每一组推荐都是一个tuple
"""
# 从 open_number_count 表通过排序方式找到6个红球号码和一个蓝球号码
recommend_numbers = []
red_balls = []
usual_red_balls = self.mysql.mysql_select_data(table_name="open_number_count", want_get_info="open_number",
order_condition="red_ball_times desc", return_num=6)
usual_blue_balls = self.mysql.mysql_select_data(table_name="open_number_count", want_get_info="open_number",
order_condition="blue_ball_times desc", return_num=1)
for x in usual_red_balls:
red_balls.append(int(x[0]))
# 进行排序
red_balls_sort = self.numbers_sort(red_balls)
# 把多次出现的推荐组合添加到数组中
recommend_numbers.append((red_balls_sort, int(usual_blue_balls[0][0])))
# 把随机推荐的组合添加到数组中
recommend_numbers.append(self.get_random_recommend_numbers(recommend_count))
# 把数据插入到数据库
new_open_info = self.calculation_open_date_and_nums()
# 为了避免多次执行导致插入多条,这里做个判断,如果数据已存在"open_nums",那么就执行更新,不插入
is_exists = self.mysql.mysql_select_data(table_name="recommend_records", want_get_info="open_nums",
where_condition="open_nums = {}".format(new_open_info[1]))
if len(is_exists) > 0:
self.mysql.mysql_update_data(table_name="recommend_records", field="recommend_nums",
value=str(recommend_numbers),
where_condition="open_nums = {}".format(new_open_info[1]))
else:
self.mysql.mysql_insert_data(table_name="recommend_records",
fields=("open_date", "open_nums", "recommend_nums"),
values=(*new_open_info, str(recommend_numbers)))
# 发送一下推荐组合
RequestsFunctions.messages_push(message_title="期号:'{}' 推荐组合".format(new_open_info[1]),
message_text=str(recommend_numbers))
return recommend_numbers
def get_random_recommend_numbers(self, recommend_count):
"""
获取随机推荐的号码
:param recommend_count: 需要推荐组的数量
:return: 返回N组推荐号码,是一个list
"""
# 红区是6位「1~33」,蓝区是1位「1~16」
random_recommend_list = []
for x in range(int(recommend_count)):
red_balls = []
while True:
if len(red_balls) == 6:
break
red_number = random.randint(1, 33)
if red_number in red_balls:
continue
else:
red_balls.append(red_number)
# 使用冒泡排序把数据整理一下,从大到小排列
self.numbers_sort(red_balls)
blue_ball = random.randint(1, 16)
random_recommend_list.append((red_balls, blue_ball))
return random_recommend_list
def numbers_sort(self, nums: list):
"""
给数据排序,使用冒泡排序算法
:param nums: 数据,需要传入一个list
:return: 返回排序好的list,从大到小
"""
for x in range(len(nums)):
for y in range(0, len(nums) - 1):
if nums[y] > nums[y + 1]:
nums[y], nums[y + 1] = nums[y + 1], nums[y]
return nums
def calculation_open_date_and_nums(self):
"""
计算开奖日期和期号
:return: 返回一个list,0:开奖日期,1:是开奖期号
"""
# 由于会先更新数据,再推荐,所以直接取最新的开奖期号和开奖日期
db_last_open_info = \
self.mysql.mysql_select_data(table_name="open_records", want_get_info="open_date, open_nums",
order_condition="id desc", return_num=1)[0]
# 开奖期号+1即可,开奖日期在星期二、四、日,所以需要获取当前时间,然后再确定+2还是+3
new_nums = str(int(db_last_open_info[1]) + 1)
# 获取当前日期
current_weekday = datetime.datetime.now().weekday() + 1
db_last_open_date = datetime.datetime.strptime(db_last_open_info[0], "%Y-%m-%d").date()
new_open_date = ""
if current_weekday in (1, 2):
new_open_date = db_last_open_date + datetime.timedelta(days=+2)
elif current_weekday in (3, 4):
new_open_date = db_last_open_date + datetime.timedelta(days=+2)
elif current_weekday in (5, 6, 7):
new_open_date = db_last_open_date + datetime.timedelta(days=+3)
return [str(new_open_date), new_nums]
def init_database(self, open_num):
"""
初始化数据库,插入open_records,open_number_count
:param open_num: 开奖的期号
:return:
"""
# 先给 open_number_count 初始化数据
for num in range(1, 34):
self.mysql.mysql_insert_data(table_name="open_number_count",
fields=("open_number", "red_ball_times", "blue_ball_times"),
values=(str(num), 0, 0))
open_nums = ""
open_date = ""
red_nums = ""
blue_nums = ""
url = "https://datachart.500.com/ssq/history/newinc/history.php?start=" + str(open_num)
req = requests.get(url=url)
# 转码一下
req.encoding = "utf-8"
# 找到所有tr标签的数据
find_tr = BeautifulSoup(req.text, features="html.parser").find_all("tr", class_="t_tr1")
results = []
for x in range(len(find_tr)):
results.append(re.findall(r"\d+", find_tr[x].text))
for y in range(1, len(find_tr) + 1):
result = results[-y]
try:
open_nums = result[0][:5]
open_date = result[8][-4:] + "-" + result[9] + "-" + result[10]
except IndexError as e:
print("有可能网站页面的奖金没有更新,导致切片失败了,等明天再执行吧")
exit()
temp = 5
for z in range(6):
# 找到每个号码
find_number = result[0][temp:temp + 2]
red_nums = red_nums + find_number + " "
# 更新这个号码出现的次数
# red_ball_show_times格式:(0,)
red_ball_show_times = self.mysql.mysql_select_data(table_name="open_number_count",
where_condition="open_number = {}".format(
find_number),
want_get_info="red_ball_times")[0][0]
self.mysql.mysql_update_data(table_name="open_number_count", value=red_ball_show_times + 1,
where_condition="open_number = {}".format(find_number),
field="red_ball_times")
temp += 2
blue_nums = result[0][-2:]
blue_ball_show_times = self.mysql.mysql_select_data(table_name="open_number_count",
where_condition="open_number = {}".format(
blue_nums),
want_get_info="blue_ball_times")[0][0]
self.mysql.mysql_update_data(table_name="open_number_count", value=blue_ball_show_times + 1,
where_condition="open_number = {}".format(blue_nums),
field="blue_ball_times")
# 开始插入数据
records_result = self.mysql.mysql_select_data(table_name="open_records", want_get_info="open_date",
where_condition='open_date = "{}"'.format(open_date))
if len(records_result) == 0:
self.mysql.mysql_insert_data(table_name="open_records",
values=(open_date, open_nums, red_nums, blue_nums),
fields=("open_date", "open_nums", "red_numbers", "blue_number"))
else:
self.mysql.mysql_update_data(table_name="open_records",
value=str(blue_nums),
field="blue_number",
where_condition='open_date = "{}"'.format(open_date))
# 重制下变量
red_nums = ""
blue_nums = ""
main.py
不解释了
#!/usr/bin/env python3
# -*- coding:utf-8 -*-
from base.base_functions import BASEFUNCTIONS
import os
if __name__ == '__main__':
# 删掉logs文件
logs_path = "./logs/mysql_functions_logs.tx
if os.path.exists(logs_path):
os.remove(logs_path)
bf = BASEFUNCTIONS()
bf.update_open_records()
print(bf.recommend_numbers(2))
main.shell
这里说两句:
- pipenv run 执行时会自动进入虚拟环境,执行后自动退出虚拟环境
- 如果把代码上传到代码托管平台,gitlab、gtiee、github 等,切记一定要使用 igitignore 不然会后悔的!!!
- pipenv install 类似 "requirements.txt" 但是比他好用多了,推荐使用 pipenv 管理虚拟环境,安装库,管理 python 版本,功能强大,你值得拥有!
添加定时任务
centos 的定时任务需要安装 crontab,所以先安装他
yum install crontabs
crontab 常用参数:
- -l:查看当前定时任务
- -e:编辑定时任务
使用 crontab -e,然后在最末位添加:0 18 * * 1,3,5 bash /具体路径/double_color_ball/main.shell,即可
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于