giftiaのblog

b+树和跳表

· giftia

b+树和跳表

  • mysql 中使用 b+ 树来存储数据。
  • redis 的有序集合使用跳表来实现。

b+树

img

  • 只有叶子节点才存储数据,中间节点只存储索引。
  • 非叶子节点的索引可以有多个,从而实现范围查询。
  • 叶子节点采用单向链表连接。

跳表

img

  • 跳表的每个节点(除头节点和尾节点)都存储数据,同时维护指向其他节点的指针。
  • 跳表的最低层是完整的双向链表,存储所有数据节点;高层是稀疏的单向链表,用于加速查找。

为什么innoDB使用b+树,redis使用跳表?

  • b+树的高度更低,io少,适合磁盘存储(redis基于内存,io影响不大)。
  • 范围查询,b+树支持,跳表不支持。
  • 跳表实现较简单,b+树实现较复杂。
维度InnoDB(B + 树)Redis(跳表)
存储介质磁盘(I/O 成本高)内存(访问速度极快)
数据规模海量数据(GB 级 +)中小规模数据(GB 级 -)
查询类型范围查询为主,需支持复杂 SQL单点查询为主,少量范围查询(如排行榜)
更新频率相对较低(写操作涉及磁盘 I/O)高(内存操作无 I/O 延迟)
实现复杂度高(需平衡树结构、事务、持久化)低(随机层级 + 简单指针操作)
核心目标数据持久化、复杂查询性能、稳定性内存高性能、简单数据结构、快速读写

go 实现跳表

package main

import (
	"fmt"
	"math"
	"math/rand"
	"time"
)

const (
	maxLevel    = 32  // 最大层级
	probability = 0.25 // 晋升概率,控制层级分布
)

// SkipList 跳表结构
type SkipList struct {
	header       *Node     // 头节点
	tail         *Node     // 尾节点
	length       int       // 元素数量
	level        int       // 当前最大层级
	randSource   rand.Source // 随机数生成器
	probTable    []float64   // 预计算的概率表
}

// Node 跳表节点结构
type Node struct {
	member   string      // 成员名称
	score    float64     // 分数,排序依据
	backward *Node       // 后退指针
	level    []*NodeLevel // 层级数组
}

// NodeLevel 节点层级结构
type NodeLevel struct {
	forward *Node // 前向指针
	span    int   // 跨度
}

// NewSkipList 创建一个新的跳表实例
func NewSkipList() *SkipList {
	// 初始化随机数生成器
	randSource := rand.NewSource(time.Now().UnixNano())
	
	// 创建头节点和尾节点
	header := &Node{
		level: make([]*NodeLevel, maxLevel),
	}
	for i := 0; i < maxLevel; i++ {
		header.level[i] = &NodeLevel{
			forward: nil,
			span:    0,
		}
	}
	
	tail := &Node{}
	
	// 初始化头节点的所有层级都指向尾节点
	for i := 0; i < maxLevel; i++ {
		header.level[i].forward = tail
	}
	
	// 预计算概率表,用于快速计算随机层级
	probTable := make([]float64, maxLevel)
	for i := 0; i < maxLevel; i++ {
		probTable[i] = math.Pow(probability, float64(i))
	}
	
	return &SkipList{
		header:     header,
		tail:       tail,
		length:     0,
		level:      1,
		randSource: randSource,
		probTable:  probTable,
	}
}

// randomLevel 生成随机层级
func (sl *SkipList) randomLevel() int {
	r := float64(rand.New(sl.randSource).Int63()) / (1 << 63)
	level := 1
	
	for level < maxLevel && r < sl.probTable[level] {
		level++
	}
	
	if level > sl.level {
		sl.level = level
	}
	
	return level
}

// Insert 插入新元素
func (sl *SkipList) Insert(score float64, member string) {
	// 查找插入位置并记录路径
	update := make([]*Node, maxLevel)
	rank := make([]int, maxLevel)
	
	x := sl.header
	for i := sl.level - 1; i >= 0; i-- {
		if i == sl.level - 1 {
			rank[i] = 0
		} else {
			rank[i] = rank[i + 1]
		}
		
		for x.level[i].forward != sl.tail && 
		    (x.level[i].forward.score < score || 
		     (x.level[i].forward.score == score && x.level[i].forward.member < member)) {
			rank[i] += x.level[i].span
			x = x.level[i].forward
		}
		update[i] = x
	}
	
	// 创建新节点
	newLevel := sl.randomLevel()
	x = &Node{
		member:   member,
		score:    score,
		backward: nil,
		level:    make([]*NodeLevel, newLevel),
	}
	
	// 初始化新节点的层级
	for i := 0; i < newLevel; i++ {
		x.level[i] = &NodeLevel{
			forward: nil,
			span:    0,
		}
	}
	
	// 更新指针和跨度
	for i := 0; i < newLevel; i++ {
		x.level[i].forward = update[i].level[i].forward
		update[i].level[i].forward = x
		
		// 更新跨度
		x.level[i].span = update[i].level[i].span - (rank[0] - rank[i])
		update[i].level[i].span = (rank[0] - rank[i]) + 1
	}
	
	// 处理未触及的层级
	for i := newLevel; i < sl.level; i++ {
		update[i].level[i].span++
	}
	
	// 设置后退指针
	if update[0] == sl.header {
		x.backward = nil
	} else {
		x.backward = update[0]
	}
	
	if x.level[0].forward != sl.tail {
		x.level[0].forward.backward = x
	} else {
		sl.tail = x
	}
	
	sl.length++
}

// Delete 删除元素
func (sl *SkipList) Delete(score float64, member string) bool {
	// 查找要删除的节点并记录路径
	update := make([]*Node, maxLevel)
	x := sl.header
	
	for i := sl.level - 1; i >= 0; i-- {
		for x.level[i].forward != sl.tail && 
		    (x.level[i].forward.score < score || 
		     (x.level[i].forward.score == score && x.level[i].forward.member < member)) {
			x = x.level[i].forward
		}
		update[i] = x
	}
	
	// 检查是否找到要删除的节点
	x = x.level[0].forward
	if x == sl.tail || x.score != score || x.member != member {
		return false
	}
	
	// 更新指针和跨度
	for i := 0; i < sl.level; i++ {
		if update[i].level[i].forward != x {
			break
		}
		update[i].level[i].span += x.level[i].span - 1
		update[i].level[i].forward = x.level[i].forward
	}
	
	// 更新后退指针
	if x.level[0].forward != sl.tail {
		x.level[0].forward.backward = x.backward
	} else {
		sl.tail = x.backward
	}
	
	// 更新跳表的最大层级
	for sl.level > 1 && sl.header.level[sl.level-1].forward == sl.tail {
		sl.level--
	}
	
	sl.length--
	return true
}

// GetRank 获取元素排名(从1开始)
func (sl *SkipList) GetRank(score float64, member string) int {
	rank := 0
	x := sl.header
	
	for i := sl.level - 1; i >= 0; i-- {
		for x.level[i].forward != sl.tail && 
		    (x.level[i].forward.score < score || 
		     (x.level[i].forward.score == score && x.level[i].forward.member <= member)) {
			rank += x.level[i].span
			x = x.level[i].forward
			
			// 如果找到完全匹配的元素,返回排名
			if x.score == score && x.member == member {
				return rank
			}
		}
	}
	
	return 0 // 未找到元素
}

// GetElementByRank 根据排名获取元素
func (sl *SkipList) GetElementByRank(rank int) (string, float64, bool) {
	if rank <= 0 || rank > sl.length {
		return "", 0, false
	}
	
	accumulatedRank := 0
	x := sl.header
	
	for i := sl.level - 1; i >= 0; i-- {
		for x.level[i].forward != sl.tail && accumulatedRank + x.level[i].span <= rank {
			accumulatedRank += x.level[i].span
			x = x.level[i].forward
		}
		
		if accumulatedRank == rank {
			return x.member, x.score, true
		}
	}
	
	return "", 0, false // 未找到
}

// Find 查找元素
func (sl *SkipList) Find(score float64, member string) bool {
	x := sl.header
	
	for i := sl.level - 1; i >= 0; i-- {
		for x.level[i].forward != sl.tail && 
		    (x.level[i].forward.score < score || 
		     (x.level[i].forward.score == score && x.level[i].forward.member < member)) {
			x = x.level[i].forward
		}
	}
	
	x = x.level[0].forward
	return x != sl.tail && x.score == score && x.member == member
}

// Range 查询范围内的元素
func (sl *SkipList) Range(minScore, maxScore float64) []string {
	result := make([]string, 0)
	x := sl.header
	
	// 找到第一个大于等于minScore的节点
	for i := sl.level - 1; i >= 0; i-- {
		for x.level[i].forward != sl.tail && x.level[i].forward.score < minScore {
			x = x.level[i].forward
		}
	}
	
	// 从该节点开始遍历,收集所有小于等于maxScore的节点
	x = x.level[0].forward
	for x != sl.tail && x.score <= maxScore {
		result = append(result, fmt.Sprintf("(%s, %.2f)", x.member, x.score))
		x = x.level[0].forward
	}
	
	return result
}

// Len 返回跳表长度
func (sl *SkipList) Len() int {
	return sl.length
}

// Example 使用示例
func main() {
	// 创建跳表
	sl := NewSkipList()
	
	// 插入元素
	sl.Insert(85.5, "player1")
	sl.Insert(90.0, "player2")
	sl.Insert(78.2, "player3")
	sl.Insert(92.3, "player4")
	sl.Insert(88.1, "player5")
	
	// 查询元素
	fmt.Println("查找player3:", sl.Find(78.2, "player3")) // 输出: true
	fmt.Println("查找player6:", sl.Find(95.0, "player6")) // 输出: false
	
	// 获取排名
	fmt.Println("player4的排名:", sl.GetRank(92.3, "player4")) // 输出: 1
	
	// 根据排名获取元素
	member, score, ok := sl.GetElementByRank(3)
	if ok {
		fmt.Printf("排名第3的元素: %s (%.2f)\n", member, score) // 输出: player1 (85.50)
	}
	
	// 范围查询
	fmt.Println("分数在80-90之间的元素:", sl.Range(80.0, 90.0)) 
	// 输出: [(player1, 85.50), (player5, 88.10)]
	
	// 删除元素
	sl.Delete(90.0, "player2")
	fmt.Println("删除player2后,player2的排名:", sl.GetRank(90.0, "player2")) // 输出: 0
}