fix(weixin): persist context tokens to disk to survive restarts (#2124)

This commit is contained in:
champly
2026-03-28 20:23:14 +08:00
committed by GitHub
parent 30155c1c59
commit 11dec0c80a
2 changed files with 97 additions and 18 deletions
+40 -6
View File
@@ -36,6 +36,10 @@ type syncCursorFile struct {
GetUpdatesBuf string `json:"get_updates_buf"`
}
type contextTokensFile struct {
Tokens map[string]string `json:"tokens"`
}
func picoclawHomeDir() string {
if home := os.Getenv(config.EnvHome); home != "" {
return home
@@ -44,14 +48,21 @@ func picoclawHomeDir() string {
return filepath.Join(userHome, ".picoclaw")
}
func buildWeixinSyncBufPath(cfg config.WeixinConfig) string {
key := "default"
func genWeixinAccountKey(cfg config.WeixinConfig) string {
token := strings.TrimSpace(cfg.Token.String())
if token != "" {
sum := sha256.Sum256([]byte(strings.TrimSpace(cfg.BaseURL) + "|" + token))
key = hex.EncodeToString(sum[:8])
if token == "" {
return "default"
}
return filepath.Join(picoclawHomeDir(), "channels", "weixin", "sync", key+".json")
sum := sha256.Sum256([]byte(strings.TrimSpace(cfg.BaseURL) + "|" + token))
return hex.EncodeToString(sum[:8])
}
func buildWeixinSyncBufPath(cfg config.WeixinConfig) string {
return filepath.Join(picoclawHomeDir(), "channels", "weixin", "sync", genWeixinAccountKey(cfg)+".json")
}
func buildWeixinContextTokensPath(cfg config.WeixinConfig) string {
return filepath.Join(picoclawHomeDir(), "channels", "weixin", "context-tokens", genWeixinAccountKey(cfg)+".json")
}
func loadGetUpdatesBuf(path string) (string, error) {
@@ -79,6 +90,29 @@ func saveGetUpdatesBuf(path, cursor string) error {
return fileutil.WriteFileAtomic(path, data, 0o600)
}
func loadContextTokens(path string) (map[string]string, error) {
data, err := os.ReadFile(path)
if err != nil {
if os.IsNotExist(err) {
return nil, nil
}
return nil, err
}
var decoded contextTokensFile
if err := json.Unmarshal(data, &decoded); err != nil {
return nil, err
}
return decoded.Tokens, nil
}
func saveContextTokens(path string, tokens map[string]string) error {
data, err := json.Marshal(contextTokensFile{Tokens: tokens})
if err != nil {
return err
}
return fileutil.WriteFileAtomic(path, data, 0o600)
}
func (c *WeixinChannel) cdnBaseURL() string {
if base := strings.TrimSpace(c.config.CDNBaseURL); base != "" {
return strings.TrimRight(base, "/")
+57 -12
View File
@@ -26,12 +26,13 @@ type WeixinChannel struct {
bus *bus.MessageBus
// contextTokens stores the last context_token per user (from_user_id → context_token).
// This is required by the iLink API to associate replies with the right chat session.
contextTokens sync.Map
typingMu sync.Mutex
typingCache map[string]typingTicketCacheEntry
pauseMu sync.Mutex
pauseUntil time.Time
syncBufPath string
contextTokens sync.Map
typingMu sync.Mutex
typingCache map[string]typingTicketCacheEntry
pauseMu sync.Mutex
pauseUntil time.Time
syncBufPath string
contextTokensPath string
}
func init() {
@@ -57,12 +58,13 @@ func NewWeixinChannel(cfg config.WeixinConfig, messageBus *bus.MessageBus) (*Wei
)
return &WeixinChannel{
BaseChannel: base,
api: api,
config: cfg,
bus: messageBus,
typingCache: make(map[string]typingTicketCacheEntry),
syncBufPath: buildWeixinSyncBufPath(cfg),
BaseChannel: base,
api: api,
config: cfg,
bus: messageBus,
typingCache: make(map[string]typingTicketCacheEntry),
syncBufPath: buildWeixinSyncBufPath(cfg),
contextTokensPath: buildWeixinContextTokensPath(cfg),
}, nil
}
@@ -70,11 +72,53 @@ func (c *WeixinChannel) Start(ctx context.Context) error {
logger.InfoC("weixin", "Starting Weixin channel")
c.ctx, c.cancel = context.WithCancel(ctx)
c.SetRunning(true)
c.restoreContextTokens()
go c.pollLoop(c.ctx)
logger.InfoC("weixin", "Weixin channel started")
return nil
}
// restoreContextTokens loads persisted context tokens from disk into memory.
func (c *WeixinChannel) restoreContextTokens() {
tokens, err := loadContextTokens(c.contextTokensPath)
if err != nil {
logger.WarnCF("weixin", "Failed to load persisted context tokens", map[string]any{
"path": c.contextTokensPath,
"error": err.Error(),
})
return
}
if len(tokens) == 0 {
return
}
for userID, token := range tokens {
c.contextTokens.Store(userID, token)
}
logger.InfoCF("weixin", "Restored context tokens from disk", map[string]any{
"path": c.contextTokensPath,
"count": len(tokens),
})
}
// persistContextTokens saves all in-memory context tokens to disk.
func (c *WeixinChannel) persistContextTokens() {
tokens := make(map[string]string)
c.contextTokens.Range(func(k, v any) bool {
if userID, ok := k.(string); ok {
if token, ok := v.(string); ok {
tokens[userID] = token
}
}
return true
})
if err := saveContextTokens(c.contextTokensPath, tokens); err != nil {
logger.WarnCF("weixin", "Failed to persist context tokens", map[string]any{
"path": c.contextTokensPath,
"error": err.Error(),
})
}
}
func (c *WeixinChannel) Stop(ctx context.Context) error {
logger.InfoC("weixin", "Stopping Weixin channel")
c.SetRunning(false)
@@ -307,6 +351,7 @@ func (c *WeixinChannel) handleInboundMessage(ctx context.Context, msg WeixinMess
// Store context_token for outbound reply association
if msg.ContextToken != "" {
c.contextTokens.Store(fromUserID, msg.ContextToken)
c.persistContextTokens()
}
c.HandleMessage(ctx, peer, messageID, fromUserID, fromUserID, content, mediaRefs, metadata, sender)