Golang Redis 管道、WATCH 和事务

Redis 管道可以通过使用单个客户端-服务器-客户端往返执行多个命令来提高性能。与其一次执行 100 个命令,不如将这些命令排队到一个管道中,然后使用一次写入 + 读取操作执行排队的命令,就好像它是一个命令一样。

管道

使用单个写入 + 读取操作执行多个命令

pipe := rdb.Pipeline()

incr := pipe.Incr(ctx, "pipeline_counter")
pipe.Expire(ctx, "pipeline_counter", time.Hour)

cmds, err := pipe.Exec(ctx)
if err != nil {
	panic(err)
}

// The value is available only after Exec is called.
fmt.Println(incr.Val())

或者,可以使用 Pipelined,它在函数退出时调用 Exec

var incr *redis.IntCmd

cmds, err := rdb.Pipelined(ctx, func(pipe redis.Pipeliner) error {
	incr = pipe.Incr(ctx, "pipelined_counter")
	pipe.Expire(ctx, "pipelined_counter", time.Hour)
	return nil
})
if err != nil {
	panic(err)
}

// The value is available only after the pipeline is executed.
fmt.Println(incr.Val())

管道也会返回已执行的命令,因此可以迭代它们以检索结果

cmds, err := rdb.Pipelined(ctx, func(pipe redis.Pipeliner) error {
	for i := 0; i < 100; i++ {
		pipe.Get(ctx, fmt.Sprintf("key%d", i))
	}
	return nil
})
if err != nil {
	panic(err)
}

for _, cmd := range cmds {
    fmt.Println(cmd.(*redis.StringCmd).Val())
}

Watch

使用 Redis 事务在新窗口中打开,可以监视键的变化,并且只有当被监视的键没有被其他客户端更改时才执行管道。这种冲突解决方法也称为 乐观锁在新窗口中打开

WATCH mykey

val = GET mykey
val = val + 1

MULTI
SET mykey $val
EXEC

事务

可以使用 TxPipelinedTxPipeline 使用 MULTI 和 EXEC 命令包装管道,但这本身并没有太大用处

cmds, err := rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
	for i := 0; i < 100; i++ {
		pipe.Get(ctx, fmt.Sprintf("key%d", i))
	}
	return nil
})
if err != nil {
	panic(err)
}

// MULTI
// GET key0
// GET key1
// ...
// GET key99
// EXEC

相反,应该使用 Watch在新窗口中打开 来进行事务管道,例如,可以使用 GETSETWATCH 正确实现 INCR在新窗口中打开 命令。请注意我们如何使用 redis.TxFailedErr 检查事务是否失败。

// Redis transactions use optimistic locking.
const maxRetries = 1000

// Increment transactionally increments the key using GET and SET commands.
func increment(key string) error {
	// Transactional function.
	txf := func(tx *redis.Tx) error {
		// Get the current value or zero.
		n, err := tx.Get(ctx, key).Int()
		if err != nil && err != redis.Nil {
			return err
		}

		// Actual operation (local in optimistic lock).
		n++

		// Operation is commited only if the watched keys remain unchanged.
		_, err = tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
			pipe.Set(ctx, key, n, 0)
			return nil
		})
		return err
	}

    // Retry if the key has been changed.
	for i := 0; i < maxRetries; i++ {
		err := rdb.Watch(ctx, txf, key)
		if err == nil {
			// Success.
			return nil
		}
		if err == redis.TxFailedErr {
			// Optimistic lock lost. Retry.
			continue
		}
		// Return any other error.
		return err
	}

	return errors.New("increment reached maximum number of retries")
}

监控性能

监控 Redis 数据库的性能对于维护系统的整体健康、效率和可靠性至关重要。适当的性能监控有助于在潜在问题导致服务中断或性能下降之前识别并解决它们。

Uptrace 是一个 OpenTelemetry APM在新窗口中打开,它支持分布式跟踪、指标和日志。您可以使用它来监控应用程序并解决问题。

Uptrace Overview

Uptrace 带有直观的查询构建器、丰富的仪表盘、带有通知的警报规则以及大多数语言和框架的集成。

Uptrace 可以在一台服务器上处理数十亿个跨度和指标,并允许您以低 10 倍的成本监控您的应用程序。

您只需几分钟即可通过访问 云演示在新窗口中打开(无需登录)或使用 Docker在新窗口中打开 在本地运行它。源代码可在 GitHub在新窗口中打开 上获取。