◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。
etl(提取、转换、加载)过程是有效管理数据的基础,特别是在需要基于实时数据快速决策的应用程序中。在本文中,我们将使用涉及币安 api 的实时加密货币交易的实际示例来探索 etl 流程。提供的 python 代码说明了如何提取交易数据、将其转换为可用格式、将其加载到 sqlite 数据库中,以及通过实时绘图可视化数据。
示例 etl 项目: https://github.com/vcse59/featureengineering/tree/main/real-time-cryptocurrency-price-tracker
1。提取物
etl 过程的第一步是提取,其中涉及从各种来源收集数据。在这种情况下,数据是通过与 binance testnet api 的 websocket 连接提取的。此连接允许实时传输 btc/usdt 交易。
代码中提取的实现方式如下:
with websockets.connect(url) as ws: response = await ws.recv() trade_data = json.loads(response)
收到的每条消息都包含必要的交易数据,包括价格、数量和时间戳,格式为 json。
2。变形
提取数据后,它会经历转换过程。此步骤清理并结构化数据以使其更有用。在我们的示例中,转换包括将时间戳从毫秒转换为可读格式,并将数据组织为适当的类型以供进一步处理。
price = float(trade_data['p']) quantity = float(trade_data['q']) timestamp = int(trade_data['t']) trade_time = datetime.fromtimestamp(timestamp / 1000.0)
这确保了价格和数量存储为浮点数,并且时间戳被转换为日期时间对象,以便于操作和分析。
3。加载
最后一步是加载,将转换后的数据存储在目标数据库中。在我们的代码中,sqlite 数据库作为交易数据的存储介质。
加载过程由以下函数管理:
def save_trade_to_db(price, quantity, timestamp): conn = sqlite3.connect('trades.db') cursor = conn.cursor() # create a table if it doesn't exist cursor.execute(''' create table if not exists trades ( id integer primary key autoincrement, price real, quantity real, timestamp text ) ''') # insert the trade data cursor.execute(''' insert into trades (price, quantity, timestamp) values (?, ?, ?) ''', (price, quantity, trade_time)) conn.commit() conn.close()
此函数连接到 sqlite 数据库,如果不存在则创建一个表,并插入交易数据。
4。可视化
除了存储数据之外,将数据可视化以便更好地理解和决策也很重要。提供的代码包含一个实时绘制交易的函数:
def plot_trades(): if len(trades) > 0: timestamps, prices, quantities = zip(*trades) plt.subplot(2, 1, 1) plt.cla() # Clear the previous plot for real-time updates plt.plot(timestamps, prices, label='Price', color='blue') plt.ylabel('Price (USDT)') plt.legend() plt.title('Real-Time BTC/USDT Prices') plt.xticks(rotation=45) plt.subplot(2, 1, 2) plt.cla() # Clear the previous plot for real-time updates plt.plot(timestamps, quantities, label='Quantity', color='orange') plt.ylabel('Quantity') plt.xlabel('Time') plt.legend() plt.xticks(rotation=45) plt.tight_layout() # Adjust layout for better spacing plt.pause(0.1) # Pause to update the plot
此函数生成两个子图:一个用于价格,另一个用于数量。它使用matplotlib库动态可视化数据,让用户实时观察市场趋势。
结论
此示例重点介绍了 etl 过程,演示了如何从 websocket api 中提取数据、进行转换以进行分析、加载到数据库中以及如何进行可视化以获取即时反馈。该框架对于构建需要基于实时数据做出明智决策的应用程序至关重要,例如交易平台和市场分析工具。
◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。