我更新了 BoardGameGeek 数据的 Python 获取器

ID:19437 / 打印

我更新了 boardgamegeek 数据的 python 获取器

此脚本将从 boardgamegeek api 获取项目数据并将数据存储在 csv 文件中。

我更新了之前的脚本。由于 api 响应采用 xml 格式,并且没有端点可以一次获取所有项目,因此前面的脚本将循环遍历提供的 id 范围,对每个项目进行逐一调用。这不是最优的,对于较大范围的 id 来说需要很长时间(目前 bgg 上可用的项目 (id) 的最高数量高达 400k+),并且结果可能不可靠。因此,通过对此脚本的一些修改,更多的项目id将作为参数值添加到单个请求url中,这样,单个响应将返回多个项目(〜800是单个响应返回的最高数量。bgg稍后可能会更改它;您可以轻松调整batch_size以便根据需要进行调整)。

此外,此脚本将获取所有项目,而不仅仅是与棋盘游戏相关的数据。

为每个棋盘游戏获取和存储的信息如下:

立即学习“Python免费学习笔记(深入)”;

名称、游戏 id、类型、评级、权重、发布年份、最小玩家数、最大玩家数、最短游戏时间、最大支付时间、最小年龄、所属者、类别、机制、设计师、艺术家和发行商。

该脚本的更新如下;我们首先导入此脚本所需的库:

# import libraries from bs4 import beautifulsoup from csv import dictwriter import pandas as pd import requests import time 

以下是脚本完成时根据 id 范围调用的函数。此外,如果发出请求时出现错误,将调用此函数以存储截至异常发生时附加到游戏列表的所有数据。

# csv file saving function def save_to_csv(games):     csv_header = [         'name', 'game_id', 'type', 'rating', 'weight', 'year_published', 'min_players', 'max_players',         'min_play_time', 'max_play_time', 'min_age', 'owned_by', 'categories',         'mechanics', 'designers', 'artists', 'publishers'     ]     with open('bgg.csv', 'a', encoding='utf8') as f:         dictwriter_object = dictwriter(f, fieldnames=csv_header)         if f.tell() == 0:             dictwriter_object.writeheader()         dictwriter_object.writerows(games) 

我们需要定义请求的标头。请求之间的暂停可以通过 sleep_between_requests 设置(我看到一些信息说速率限制是每秒 2 个请求,但它可能是过时的信息,因为我将暂停设置为 0 没有遇到问题)。另外,这里设置起始点id(start_id_range)、最大范围(max_id_range)和batch_size的值,batch_size是响应应该返回的游戏数量。基本 url 在本节中定义,但 id 在脚本的下一部分中添加。

# define request url headers headers = {     "user-agent": "mozilla/5.0 (macintosh; intel mac os x 10.16; rv:85.0) gecko/20100101 firefox/85.0",     "accept-language": "en-gb, en-us, q=0.9, en" }  # define sleep timer value between requests sleep_between_request = 0  # define max id range start_id_range = 0 max_id_range = 403000 batch_size = 800 base_url = "https://boardgamegeek.com/xmlapi2/thing?id=" 

下面是这个脚本的主要逻辑。首先,根据批量大小,它会生成一个在定义的 id 范围内的 id 字符串,但 id 的数量不得超过 batch_size 中定义的数量,并将其附加到 url 的 id 参数中。这样,每个响应将返回与批量大小相同的项目数量的数据。之后,它将处理数据并将其附加到每个响应的游戏列表中,最后附加到 csv 文件中。

# main loop that will iterate between the starting and maximum range in intervals of the batch size for batch_start in range(start_id_range, max_id_range, batch_size):     # make sure that the batch size will not exceed the maximum ids range     batch_end = min(batch_start + batch_size - 1, max_id_range)     # join and append to the url the ids within batch size     ids = ",".join(map(str, range(batch_start, batch_end + 1)))     url = f"{base_url}?id={ids}&stats=1"      # if by any chance there is an error, this will throw the exception and continue on the next batch     try:         response = requests.get(url, headers=headers)     except exception as err:         print(err)         continue       if response.status_code == 200:         soup = beautifulsoup(response.text, features="html.parser")         items = soup.find_all("item")         games = []         for item in items:             if item:                 try:                     # find values in the xml                     name = item.find("name")['value'] if item.find("name") is not none else 0                     year_published = item.find("yearpublished")['value'] if item.find("yearpublished") is not none else 0                     min_players = item.find("minplayers")['value'] if item.find("minplayers") is not none else 0                     max_players = item.find("maxplayers")['value'] if item.find("maxplayers") is not none else 0                     min_play_time = item.find("minplaytime")['value'] if item.find("minplaytime") is not none else 0                     max_play_time = item.find("maxplaytime")['value'] if item.find("maxplaytime") is not none else 0                     min_age = item.find("minage")['value'] if item.find("minage") is not none else 0                     rating = item.find("average")['value'] if item.find("average") is not none else 0                     weight = item.find("averageweight")['value'] if item.find("averageweight") is not none else 0                     owned = item.find("owned")['value'] if item.find("owned") is not none else 0                       link_type = {'categories': [], 'mechanics': [], 'designers': [], 'artists': [], 'publishers': []}                      links = item.find_all("link")                      # append value(s) for each link type                     for link in links:                                                     if link['type'] == "boardgamecategory":                             link_type['categories'].append(link['value'])                         if link['type'] == "boardgamemechanic":                             link_type['mechanics'].append(link['value'])                         if link['type'] == "boardgamedesigner":                             link_type['designers'].append(link['value'])                         if link['type'] == "boardgameartist":                             link_type['artists'].append(link['value'])                         if link['type'] == "boardgamepublisher":                             link_type['publishers'].append(link['value'])                      # append 0 if there is no value for any link type                     for key, ltype in link_type.items():                         if not ltype:                             ltype.append("0")                      game = {                         "name": name,                         "game_id": item['id'],                         "type": item['type'],                         "rating": rating,                         "weight": weight,                         "year_published": year_published,                         "min_players": min_players,                         "max_players": max_players,                         "min_play_time": min_play_time,                         "max_play_time": max_play_time,                         "min_age": min_age,                         "owned_by": owned,                         "categories": ', '.join(link_type['categories']),                         "mechanics": ', '.join(link_type['mechanics']),                         "designers": ', '.join(link_type['designers']),                         "artists": ', '.join(link_type['artists']),                         "publishers": ', '.join(link_type['publishers']),                     }                      # append current item to games list                     games.append(game)                 except typeerror:                     print(">>> nonetype error. continued on the next item.")                     continue         save_to_csv(games)          print(f">>> request successful for batch {batch_start}-{batch_end}")     else:         print(f">>> failed batch {batch_start}-{batch_end}")      # pause between requests     time.sleep(sleep_between_request) 

下面您可以以 pandas dataframe 的形式预览 csv 文件中的前几行记录。

# Preview the CSV as pandas DataFrame df = pd.read_csv('./bgg.csv') print(df.head(5)) 
上一篇: 如何使用 Flask 和 Python 创建 RESTful API
下一篇: Python 封装:了解私有成员和受保护成员

作者:admin @ 24资源网   2025-01-14

本站所有软件、源码、文章均有网友提供,如有侵权联系308410122@qq.com

与本文相关文章

发表评论:

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。