路漫漫其修远兮
吾将上下而求索

AKShare数据采集简单分析功能

  • import os
    import tkinter as tk
    from tkinter import ttk, messagebox
    import akshare as ak
    import pandas as pd
    import numpy as np
    from datetime import datetime, timedelta
    import threading
    import matplotlib.pyplot as plt
    from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
    from sqlalchemy import create_engine, text
    from matplotlib.font_manager import FontProperties
    from scipy.signal import argrelextrema
    import time
    
    
    # ================= 1. 字体与环境配置 =================
    def init_sys_font():
        # 优先使用微软雅黑,增强中文显示效果
        for path in [r"C:\Windows\Fonts\msyh.ttc", r"C:\Windows\Fonts\simhei.ttf"]:
            if os.path.exists(path):
                return FontProperties(fname=path)
        return None
    
    
    MY_FONT = init_sys_font()
    if MY_FONT:
        plt.rcParams['font.sans-serif'] = [MY_FONT.get_name()]
        plt.rcParams['axes.unicode_minus'] = False
    
    # 全局变量
    GLOBAL_NAME_MAP = {}
    GLOBAL_TYPE_MAP = {}
    
    # ================= 2. 数据库配置 =================
    DB_CONFIG = {
        "host": "数据库ip", "port": 3306,
        "user": "用户名称", "password": "数据库密码", "database": "数据库名称"
    }
    engine_url = f"mysql+pymysql://{DB_CONFIG['user']}:{DB_CONFIG['password']}@{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['database']}?charset=utf8mb4"
    # 配置连接池以应对远程数据库的不稳定性
    db_engine = create_engine(engine_url, pool_size=10, max_overflow=20, pool_recycle=3600, pool_pre_ping=True)
    
    
    # ================= 3. 算法核心:支撑压力与预警 =================
    
    def find_robust_levels(df, window=20, sensitivity=0.015):
        """
        通过局部极值并进行聚类合并,计算稳健的支撑压力位
        """
        if len(df) < window * 2: return [], []
        prices = df['收盘'].values
    
        # 获取局部极大/极小值索引
        max_idx = argrelextrema(prices, np.greater, order=window)[0]
        min_idx = argrelextrema(prices, np.less, order=window)[0]
    
        raw_res = sorted(prices[max_idx])
        raw_sup = sorted(prices[min_idx])
    
        def cluster_levels(levels):
            if not levels: return []
            clusters = []
            curr = levels[0]
            for i in range(1, len(levels)):
                # 如果两个价位间距小于 sensitivity (默认1.5%),则进行合并
                if (levels[i] - curr) / curr < sensitivity:
                    curr = (curr + levels[i]) / 2
                else:
                    clusters.append(curr)
                    curr = levels[i]
            clusters.append(curr)
            return clusters
    
        return cluster_levels(raw_res), cluster_levels(raw_sup)
    
    
    def check_price_alert(name, current_p, res_lvls, sup_lvls):
        """
        预警逻辑:检查价格是否处于关键的技术位附近
        """
        alerts = []
        # 支撑位检查
        for s in sup_lvls:
            ratio = (current_p - s) / s
            if 0 < ratio < 0.015:
                alerts.append(f"【{name}】接近支撑位 {s:.2f} (关注企稳)")
            elif -0.01 < ratio <= 0:
                alerts.append(f"【{name}】跌穿支撑 {s:.2f} (风险预警!)")
    
        # 压力位检查
        for r in res_lvls:
            ratio = (r - current_p) / current_p
            if 0 < ratio < 0.015:
                alerts.append(f"【{name}】接近压力位 {r:.2f} (警惕回落)")
    
        return " | ".join(alerts) if alerts else "● 价格目前处于震荡区间"
    
    
    # ================= 4. 数据处理逻辑 =================
    
    def update_status(msg, color="#2c3e50"):
        root.after(0, lambda: status_label.config(text=msg, foreground=color))
    
    
    def update_name_mapping_to_db():
        """补回的名称映射同步功能"""
        update_status("● 正在从全市场同步名称映射...", "#3498db")
    
        def task():
            try:
                df_s = ak.stock_zh_a_spot_em()[['代码', '名称']]
                df_s.columns = ['sec_code', 'sec_name']
                df_s['sec_type'] = 'STOCK'
    
                df_e = ak.fund_etf_spot_em()[['代码', '名称']]
                df_e.columns = ['sec_code', 'sec_name']
                df_e['sec_type'] = 'ETF'
    
                full_map = pd.concat([df_s, df_e], ignore_index=True).drop_duplicates(subset=['sec_code'])
                with db_engine.begin() as conn:
                    conn.execute(text("DELETE FROM name_mapping"))
                    full_map.to_sql('name_mapping', conn, if_exists='append', index=False)
    
                global GLOBAL_NAME_MAP
                GLOBAL_NAME_MAP = dict(zip(full_map['sec_code'], full_map['sec_name']))
                update_status(f"● 成功刷新 {len(GLOBAL_NAME_MAP)} 条证券名称", "#2ecc71")
                messagebox.showinfo("同步成功", "本地代码名称映射表已更新。")
            except Exception as e:
                update_status("● 名称同步失败", "#e74c3c")
                messagebox.showerror("错误", f"无法连接API或数据库: {e}")
    
        threading.Thread(target=task, daemon=True).start()
    
    
    def sync_data(specific_code=None, is_batch=False):
        code = specific_code if specific_code else entry_code.get().strip()
        if len(code) != 6: return
    
        if not is_batch:
            btn_run.config(state=tk.DISABLED)
            update_status(f"● 正在获取 {code} 历史数据...", "#3498db")
    
        def task():
            try:
                is_etf = code.startswith(('51', '58', '15', '16', '56'))
                name = GLOBAL_NAME_MAP.get(code, f"代码_{code}")
                table = "all_etf_data" if is_etf else "all_stock_data"
                code_col = "etf_code" if is_etf else "stock_code"
                name_col = "etf_name" if is_etf else "stock_name"
    
                with db_engine.connect() as conn:
                    last_d = conn.execute(text(f"SELECT MAX(日期) FROM {table} WHERE {code_col} = :c"),
                                          {"c": code}).scalar()
    
                start_date = (last_d + timedelta(days=1)).strftime('%Y%m%d') if last_d else "20220101"
                today_str = datetime.now().strftime('%Y%m%d')
    
                if not last_d or last_d < datetime.now().date():
                    df_new = ak.fund_etf_hist_em(symbol=code, start_date=start_date, end_date=today_str,
                                                 adjust="qfq") if is_etf \
                        else ak.stock_zh_a_hist(symbol=code, start_date=start_date, end_date=today_str, adjust="qfq")
    
                    if df_new is not None and not df_new.empty:
                        df_new['日期'] = pd.to_datetime(df_new['日期']).dt.date
                        df_new[code_col], df_new[name_col] = code, name
                        with db_engine.begin() as conn:
                            df_new.to_sql(table, conn, if_exists='append', index=False)
    
                if not is_batch:
                    full_df = pd.read_sql(text(f"SELECT * FROM {table} WHERE {code_col} = :c ORDER BY 日期"), db_engine,
                                          params={"c": code})
                    root.after(0, lambda: refresh_ui(full_df, name, code))
            except Exception as e:
                if not is_batch: update_status(f"● 错误: {str(e)[:30]}", "#e74c3c")
            finally:
                if not is_batch: root.after(0, lambda: btn_run.config(state=tk.NORMAL))
    
        threading.Thread(target=task, daemon=True).start()
    
    
    # ================= 5. UI 与图表绘制 =================
    
    def refresh_ui(df, name, code):
        if df is None or df.empty: return
        for i in tree.get_children(): tree.delete(i)
    
        latest = df.iloc[-1]
        curr_p = latest['收盘']
    
        # 算法计算与预警判定
        res_lvls, sup_lvls = find_robust_levels(df)
        alert_msg = check_price_alert(name, curr_p, res_lvls, sup_lvls)
    
        # 状态栏颜色联动
        status_color = "#e67e22" if "关注" in alert_msg else ("#e74c3c" if "风险" in alert_msg else "#2ecc71")
        update_status(alert_msg, status_color)
    
        y_df = df.tail(250)
        stats_label.config(
            text=f"【{name}】({code}) 最新:{curr_p} ({latest['涨跌幅']}%) | 250日高:{y_df['最高'].max()} 低:{y_df['最低'].min()}")
    
        # 插入表格数据
        for _, r in df.sort_values('日期', ascending=False).head(40).iterrows():
            tag = 'up' if r['涨跌幅'] > 0 else ('down' if r['涨跌幅'] < 0 else '')
            tree.insert("", "end", values=(r['日期'], r['收盘'], r['最高'], r['最低'], f"{r['涨跌幅']}%"), tags=(tag,))
    
        draw_chart(df, name, code, res_lvls, sup_lvls)
    
    
    def draw_chart(df, name, code, res_lvls, sup_lvls):
        for widget in chart_inner_frame.winfo_children(): widget.destroy()
        plt.close('all')
    
        df['日期'] = pd.to_datetime(df['日期'])
        plot_df = df.tail(120).copy()
        curr_p = df['收盘'].iloc[-1]
    
        fig = plt.Figure(figsize=(10, 8), dpi=100, facecolor='#f8f9fa')
        gs = fig.add_gridspec(3, 1, height_ratios=[3, 1, 1], hspace=0.15)
        ax1, ax2, ax3 = fig.add_subplot(gs[0]), fig.add_subplot(gs[1]), fig.add_subplot(gs[2])
    
        # 主图价格与均线
        ax1.plot(plot_df['日期'], plot_df['收盘'], color='#2980b9', lw=2, label='价格', zorder=5)
        for m, c in zip([5, 20, 60], ['#e67e22', '#27ae60', '#8e44ad']):
            ma = df['收盘'].rolling(m).mean().reindex(plot_df.index)
            ax1.plot(plot_df['日期'], ma, color=c, lw=1, label=f'MA{m}', alpha=0.5)
    
        # 绘制智能支撑压力线 (标签置于右侧防止干扰)
        active_res = [l for l in res_lvls if l > curr_p][:2]
        active_sup = [l for l in sup_lvls if l < curr_p][-2:]
    
        for l in active_res:
            ax1.axhline(l, color='#c0392b', ls='--', lw=1.2, alpha=0.6)
            ax1.text(plot_df['日期'].iloc[-1], l, f" 压:{l:.2f}", color='#c0392b', fontproperties=MY_FONT, va='bottom',
                     fontsize=9, fontweight='bold')
        for l in active_sup:
            ax1.axhline(l, color='#1e8449', ls='--', lw=1.2, alpha=0.6)
            ax1.text(plot_df['日期'].iloc[-1], l, f" 支:{l:.2f}", color='#1e8449', fontproperties=MY_FONT, va='top',
                     fontsize=9, fontweight='bold')
    
        ax1.legend(prop=MY_FONT, loc='upper left', frameon=False, ncol=4)
        ax1.grid(True, linestyle=':', alpha=0.4)
        ax1.set_title(f"{name} ({code}) 智能趋势分析", fontproperties=MY_FONT, fontsize=12)
    
        # 成交量与MACD省略部分与原版一致,增加视觉协调性
        v_colors = ['#e74c3c' if x > 0 else '#27ae60' for x in plot_df['涨跌幅']]
        ax2.bar(plot_df['日期'], plot_df['成交量'], color=v_colors, alpha=0.7)
    
        exp1 = plot_df['收盘'].ewm(span=12).mean()
        exp2 = plot_df['收盘'].ewm(span=26).mean()
        dif = exp1 - exp2
        dea = dif.ewm(span=9).mean()
        macd = (dif - dea) * 2
        ax3.bar(plot_df['日期'], macd, color=['#e74c3c' if v >= 0 else '#27ae60' for v in macd], alpha=0.6)
        ax3.plot(plot_df['日期'], dif, color='blue', lw=0.8, label='DIF')
        ax3.plot(plot_df['日期'], dea, color='orange', lw=0.8, label='DEA')
    
        for ax in [ax1, ax2]: ax.set_xticks([])
        ax3.tick_params(axis='x', rotation=30, labelsize=8)
    
        canvas = FigureCanvasTkAgg(fig, master=chart_inner_frame)
        canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True)
    
    
    # ================= 6. UI 事件逻辑 =================
    
    def on_key_release(event):
        if event.keysym in ("Up", "Down", "Return"): return
        pattern = entry_code.get().strip()
        suggestion_list.delete(0, tk.END)
        if len(pattern) >= 2:
            matches = [f"{c} | {n}" for c, n in GLOBAL_NAME_MAP.items() if pattern in c or pattern in n]
            if matches:
                # 修正:相对于 entry_code 的绝对位置定位提示框
                suggestion_list.place(x=entry_code.winfo_x(), y=entry_code.winfo_y() + entry_code.winfo_height() + 5,
                                      width=220)
                for m in matches[:10]: suggestion_list.insert(tk.END, m)
            else:
                suggestion_list.place_forget()
        else:
            suggestion_list.place_forget()
    
    
    def on_item_select(event):
        if not suggestion_list.curselection(): return
        val = suggestion_list.get(suggestion_list.curselection())
        entry_code.delete(0, tk.END)
        entry_code.insert(0, val.split(' | ')[0])
        suggestion_list.place_forget()
        sync_data()
    
    
    # --- 主界面启动 ---
    root = tk.Tk()
    root.title("金融快析 Pro v5.2 (预警与同步增强版)")
    root.geometry("1400x900")
    root.configure(bg='#f5f6fa')
    
    # 顶部工具栏
    top = ttk.Frame(root, padding=10);
    top.pack(fill=tk.X)
    ttk.Label(top, text="输入代码/名称:").pack(side=tk.LEFT)
    entry_code = ttk.Entry(top, width=20, font=('Consolas', 11))
    entry_code.pack(side=tk.LEFT, padx=5)
    entry_code.bind('<KeyRelease>', on_key_release)
    
    btn_run = ttk.Button(top, text="同步并查看", command=sync_data);
    btn_run.pack(side=tk.LEFT, padx=5)
    ttk.Button(top, text="全量同步库", command=lambda: sync_data()).pack(side=tk.LEFT, padx=5)
    # 重新加入的关键按钮
    ttk.Button(top, text="刷新名称映射", command=update_name_mapping_to_db).pack(side=tk.LEFT, padx=5)
    
    # 主显示区域
    content = ttk.PanedWindow(root, orient=tk.HORIZONTAL);
    content.pack(fill=tk.BOTH, expand=True, padx=10, pady=5)
    tree_frame = ttk.Frame(content)
    tree = ttk.Treeview(tree_frame, columns=('D', 'C', 'H', 'L', 'P'), show='headings')
    for c, t in zip(('D', 'C', 'H', 'L', 'P'), ('日期', '价格', '最高', '最低', '涨跌')):
        tree.heading(c, text=t);
        tree.column(c, width=80, anchor='center')
    tree.tag_configure('up', foreground='#e74c3c');
    tree.tag_configure('down', foreground='#27ae60')
    tree.pack(fill=tk.BOTH, expand=True);
    content.add(tree_frame, weight=1)
    
    chart_container = ttk.Frame(content)
    stats_label = ttk.Label(chart_container, text="系统准备就绪", font=('微软雅黑', 11, 'bold'));
    stats_label.pack(pady=5)
    chart_inner_frame = ttk.Frame(chart_container);
    chart_inner_frame.pack(fill=tk.BOTH, expand=True)
    content.add(chart_container, weight=4)
    
    # 状态栏
    status_label = ttk.Label(root, text="● 系统就绪", font=('微软雅黑', 10, 'bold'), padding=(10, 5))
    status_label.pack(side=tk.BOTTOM, anchor=tk.W)
    
    # 建议列表
    suggestion_list = tk.Listbox(root, height=8, font=('微软雅黑', 10), bd=1, relief="solid")
    suggestion_list.bind('<<ListboxSelect>>', on_item_select)
    
    
    # 初始化异步加载数据库名单
    def init_all():
        try:
            df = pd.read_sql("SELECT sec_code, sec_name FROM name_mapping", db_engine)
            global GLOBAL_NAME_MAP
            GLOBAL_NAME_MAP = dict(zip(df['sec_code'], df['sec_name']))
            update_status(f"● 本地已加载 {len(GLOBAL_NAME_MAP)} 只证券信息", "#2ecc71")
        except:
            update_status("● 初次运行请点击 [刷新名称映射]", "#f39c12")
    
    
    threading.Thread(target=init_all, daemon=True).start()
    root.mainloop()

 

未经允许不得转载:依林 » AKShare数据采集简单分析功能
分享到: 更多 (0)

相关推荐

  • 暂无文章

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址