Skip to content

Ruby 数据库访问

数据库是现代应用程序的核心组件,几乎所有的应用程序都需要与数据库进行交互来存储和检索数据。Ruby提供了多种方式来访问数据库,从原生的数据库驱动到高级的ORM框架。本章将详细介绍Ruby中数据库访问的各种方法和最佳实践。

🎯 数据库访问基础

数据库驱动程序

Ruby通过数据库驱动程序与各种数据库系统进行通信。每个数据库系统都有对应的Ruby驱动程序。

ruby
# SQLite数据库访问
# 首先安装sqlite3 gem: gem install sqlite3

require 'sqlite3'

# 连接数据库(如果不存在会自动创建)
db = SQLite3::Database.new("example.db")

# 执行SQL语句
db.execute <<-SQL
  CREATE TABLE IF NOT EXISTS users (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    name VARCHAR(100) NOT NULL,
    email VARCHAR(100) UNIQUE NOT NULL,
    age INTEGER,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
  );
SQL

# 插入数据
db.execute("INSERT INTO users (name, email, age) VALUES (?, ?, ?)", 
           ["张三", "zhangsan@example.com", 25])

# 查询数据
results = db.execute("SELECT * FROM users")
results.each do |row|
  puts "ID: #{row[0]}, 姓名: #{row[1]}, 邮箱: #{row[2]}, 年龄: #{row[3]}"
end

# 使用哈希模式获取结果
db.results_as_hash = true
results = db.execute("SELECT * FROM users")
results.each do |row|
  puts "ID: #{row['id']}, 姓名: #{row['name']}, 邮箱: #{row['email']}"
end

# 关闭数据库连接
db.close

MySQL数据库访问

ruby
# MySQL数据库访问
# 首先安装mysql2 gem: gem install mysql2

require 'mysql2'

# 连接MySQL数据库
client = Mysql2::Client.new(
  host: "localhost",
  username: "root",
  password: "password",
  database: "myapp_development"
)

# 创建表
client.query <<-SQL
  CREATE TABLE IF NOT EXISTS products (
    id INT AUTO_INCREMENT PRIMARY KEY,
    name VARCHAR(255) NOT NULL,
    price DECIMAL(10, 2) NOT NULL,
    category VARCHAR(100),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
  )
SQL

# 插入数据
client.query("INSERT INTO products (name, price, category) VALUES ('苹果', 5.50, '水果')")

# 使用参数化查询防止SQL注入
client.query("INSERT INTO products (name, price, category) VALUES (?, ?, ?)", 
             ["香蕉", 3.00, "水果"])

# 查询数据
results = client.query("SELECT * FROM products WHERE price > ?", [4.00])
results.each do |row|
  puts "产品: #{row['name']}, 价格: #{row['price']}, 分类: #{row['category']}"
end

# 更新数据
client.query("UPDATE products SET price = ? WHERE name = ?", [6.00, "苹果"])

# 删除数据
client.query("DELETE FROM products WHERE name = ?", ["香蕉"])

# 获取查询信息
puts "受影响的行数: #{client.affected_rows}"
puts "最后插入的ID: #{client.last_id}"

# 关闭连接
client.close

PostgreSQL数据库访问

ruby
# PostgreSQL数据库访问
# 首先安装pg gem: gem install pg

require 'pg'

# 连接PostgreSQL数据库
conn = PG.connect(
  host: "localhost",
  port: 5432,
  dbname: "myapp_development",
  user: "postgres",
  password: "password"
)

# 创建表
conn.exec <<-SQL
  CREATE TABLE IF NOT EXISTS employees (
    id SERIAL PRIMARY KEY,
    name VARCHAR(100) NOT NULL,
    department VARCHAR(100),
    salary NUMERIC(10, 2),
    hire_date DATE DEFAULT CURRENT_DATE
  )
SQL

# 插入数据
conn.exec_params("INSERT INTO employees (name, department, salary) VALUES ($1, $2, $3)",
                 ["张三", "技术部", 15000.00])

# 查询数据
result = conn.exec_params("SELECT * FROM employees WHERE salary > $1", [10000])
result.each do |row|
  puts "员工: #{row['name']}, 部门: #{row['department']}, 薪资: #{row['salary']}"
end

# 更新数据
conn.exec_params("UPDATE employees SET salary = $1 WHERE name = $2", [16000.00, "张三"])

# 删除数据
conn.exec_params("DELETE FROM employees WHERE name = $1", ["张三"])

# 使用事务
conn.transaction do |txn|
  txn.exec_params("INSERT INTO employees (name, department, salary) VALUES ($1, $2, $3)",
                  ["李四", "销售部", 12000.00])
  txn.exec_params("INSERT INTO employees (name, department, salary) VALUES ($1, $2, $3)",
                  ["王五", "人事部", 11000.00])
end

# 关闭连接
conn.close

🏗️ 数据库连接管理

连接池

ruby
# 使用连接池管理数据库连接
require 'sqlite3'
require 'thread'

class DatabasePool
  def initialize(database_path, pool_size = 5)
    @database_path = database_path
    @pool_size = pool_size
    @pool = Queue.new
    @mutex = Mutex.new
    
    # 初始化连接池
    @pool_size.times do
      @pool << create_connection
    end
  end
  
  def execute(sql, params = [])
    conn = acquire_connection
    begin
      conn.execute(sql, params)
    ensure
      release_connection(conn)
    end
  end
  
  def query(sql, params = [])
    conn = acquire_connection
    begin
      conn.query(sql, params)
    ensure
      release_connection(conn)
    end
  end
  
  private
  
  def create_connection
    SQLite3::Database.new(@database_path)
  end
  
  def acquire_connection
    @pool.pop
  end
  
  def release_connection(conn)
    @pool << conn
  end
end

# 使用连接池
# pool = DatabasePool.new("example.db")
# pool.execute("INSERT INTO users (name, email) VALUES (?, ?)", ["测试用户", "test@example.com"])

数据库配置管理

ruby
# 数据库配置管理
require 'yaml'
require 'erb'

class DatabaseConfig
  def self.load(config_file = 'config/database.yml')
    erb = ERB.new(File.read(config_file)).result
    YAML.load(erb)
  end
  
  def self.connect(environment = 'development')
    config = load
    env_config = config[environment]
    
    case env_config['adapter']
    when 'sqlite3'
      require 'sqlite3'
      SQLite3::Database.new(env_config['database'])
    when 'mysql2'
      require 'mysql2'
      Mysql2::Client.new(
        host: env_config['host'],
        username: env_config['username'],
        password: env_config['password'],
        database: env_config['database'],
        port: env_config['port']
      )
    when 'postgresql'
      require 'pg'
      PG.connect(
        host: env_config['host'],
        port: env_config['port'],
        dbname: env_config['database'],
        user: env_config['username'],
        password: env_config['password']
      )
    end
  end
end

# database.yml 配置文件示例
=begin
development:
  adapter: sqlite3
  database: db/development.sqlite3

test:
  adapter: sqlite3
  database: db/test.sqlite3

production:
  adapter: postgresql
  host: localhost
  port: 5432
  database: myapp_production
  username: myapp
  password: <%= ENV['DATABASE_PASSWORD'] %>
=end

🎯 ORM框架 - ActiveRecord

ActiveRecord基础

ruby
# ActiveRecord是Ruby on Rails的ORM框架
# 首先安装activerecord gem: gem install activerecord

require 'active_record'
require 'sqlite3'

# 配置数据库连接
ActiveRecord::Base.establish_connection(
  adapter: 'sqlite3',
  database: 'example.db'
)

# 定义模型
class User < ActiveRecord::Base
end

class Post < ActiveRecord::Base
  belongs_to :user
end

# 创建表(使用迁移)
class CreateUsers < ActiveRecord::Migration[7.0]
  def change
    create_table :users do |t|
      t.string :name, null: false
      t.string :email, null: false
      t.integer :age
      t.timestamps
    end
    
    add_index :users, :email, unique: true
  end
end

class CreatePosts < ActiveRecord::Migration[7.0]
  def change
    create_table :posts do |t|
      t.string :title, null: false
      t.text :content
      t.references :user, null: false, foreign_key: true
      t.timestamps
    end
    
    add_index :posts, :user_id
  end
end

# 运行迁移
# CreateUsers.new.change
# CreatePosts.new.change

# CRUD操作
# 创建记录
user = User.create(name: "张三", email: "zhangsan@example.com", age: 25)
puts "创建用户: #{user.name}"

# 查询记录
all_users = User.all
puts "所有用户数: #{all_users.count}"

user_by_id = User.find(1)
puts "用户: #{user_by_id.name}"

user_by_email = User.find_by(email: "zhangsan@example.com")
puts "邮箱用户: #{user_by_email.name}"

# 条件查询
young_users = User.where("age < ?", 30)
young_users.each { |user| puts "年轻用户: #{user.name}" }

# 更新记录
user.update(name: "张三丰", age: 30)
puts "更新后用户: #{user.name}, 年龄: #{user.age}"

# 删除记录
# user.destroy

# 关联查询
post = Post.create(title: "第一篇文章", content: "这是内容", user: user)
user_posts = user.posts
puts "用户文章数: #{user_posts.count}"

ActiveRecord高级特性

ruby
# 高级查询
class User < ActiveRecord::Base
  has_many :posts
  has_many :comments
  scope :adults, -> { where("age >= ?", 18) }
  scope :recent, -> { order(created_at: :desc) }
  
  validates :name, presence: true
  validates :email, presence: true, uniqueness: true
  validates :age, numericality: { greater_than: 0 }
end

class Post < ActiveRecord::Base
  belongs_to :user
  has_many :comments
  scope :published, -> { where.not(published_at: nil) }
  
  validates :title, presence: true
  validates :user, presence: true
end

class Comment < ActiveRecord::Base
  belongs_to :user
  belongs_to :post
  
  validates :content, presence: true
end

# 范围查询
adult_users = User.adults
recent_users = User.recent.limit(10)

# 关联查询
user = User.includes(:posts).find(1)
user.posts.each { |post| puts post.title }

# 连接查询
posts_with_users = Post.joins(:user).where(users: { age: 25 })
posts_with_users.each { |post| puts "#{post.user.name}: #{post.title}" }

# 聚合查询
total_users = User.count
average_age = User.average(:age)
max_age = User.maximum(:age)
min_age = User.minimum(:age)

# 分组查询
users_by_age = User.group(:age).count
puts users_by_age

# 原生SQL查询
users = User.find_by_sql("SELECT * FROM users WHERE age > ?", [20])

# 事务处理
ActiveRecord::Base.transaction do
  user = User.create!(name: "新用户", email: "newuser@example.com", age: 25)
  post = user.posts.create!(title: "新文章", content: "内容")
  # 如果任何操作失败,整个事务都会回滚
end

# 回调
class User < ActiveRecord::Base
  before_create :set_default_age
  after_create :send_welcome_email
  before_update :log_changes
  
  private
  
  def set_default_age
    self.age ||= 18
  end
  
  def send_welcome_email
    # 发送欢迎邮件
    puts "发送欢迎邮件给 #{email}"
  end
  
  def log_changes
    puts "用户 #{name} 的信息已更新"
  end
end

🔧 数据库操作最佳实践

数据验证和安全

ruby
# 模型验证
class User < ActiveRecord::Base
  # 基本验证
  validates :name, presence: true, length: { minimum: 2, maximum: 50 }
  validates :email, presence: true, uniqueness: { case_sensitive: false }
  validates :age, presence: true, numericality: { 
    only_integer: true, 
    greater_than: 0, 
    less_than: 150 
  }
  
  # 自定义验证
  validate :email_format
  validate :name_not_reserved
  
  # 格式验证
  validates :email, format: { 
    with: /\A[\w+\-.]+@[a-z\d\-]+(\.[a-z\d\-]+)*\.[a-z]+\z/i,
    message: "邮箱格式不正确"
  }
  
  # 条件验证
  validates :phone, presence: true, if: :phone_required?
  
  # 排除验证
  validates_exclusion :name, in: %w(admin administrator root)
  
  # 包含验证
  validates_inclusion :status, in: %w(active inactive suspended)
  
  private
  
  def email_format
    unless email.match?(/\A[\w+\-.]+@[a-z\d\-]+(\.[a-z\d\-]+)*\.[a-z]+\z/i)
      errors.add(:email, "格式不正确")
    end
  end
  
  def name_not_reserved
    reserved_names = %w(admin root user guest)
    if reserved_names.include?(name&.downcase)
      errors.add(:name, "不能使用保留名称")
    end
  end
  
  def phone_required?
    status == 'active'
  end
end

# 使用验证
user = User.new(name: "a", email: "invalid-email", age: -5)
unless user.valid?
  puts "验证错误:"
  user.errors.full_messages.each { |msg| puts "  #{msg}" }
end

SQL注入防护

ruby
# 安全的数据库操作
class UserService
  # 使用参数化查询
  def self.find_by_email_safe(email)
    User.where("email = ?", email)
  end
  
  # 使用哈希条件
  def self.find_by_conditions(conditions)
    User.where(conditions)
  end
  
  # 安全的原生SQL
  def self.search_users_safe(keyword)
    sanitized_keyword = User.sanitize_sql_like(keyword)
    User.where("name LIKE ?", "%#{sanitized_keyword}%")
  end
  
  # 避免危险的字符串插值
  # 危险的做法
  # def self.find_by_email_dangerous(email)
  #   User.where("email = '#{email}'")  # 容易受到SQL注入攻击
  # end
  
  # 批量操作
  def self.bulk_insert_safe(users_data)
    User.insert_all(users_data, record_timestamps: true)
  end
  
  # 安全的动态查询
  def self.dynamic_search(filters)
    scope = User.all
    
    filters.each do |key, value|
      case key.to_sym
      when :name
        scope = scope.where("name LIKE ?", "%#{User.sanitize_sql_like(value)}%")
      when :min_age
        scope = scope.where("age >= ?", value)
      when :max_age
        scope = scope.where("age <= ?", value)
      when :status
        scope = scope.where(status: value)
      end
    end
    
    scope
  end
end

# 使用安全的查询方法
users = UserService.find_by_email_safe("user@example.com")
users = UserService.find_by_conditions(name: "张三", age: 25)
users = UserService.search_users_safe("张")
users = UserService.dynamic_search(name: "张", min_age: 18, status: "active")

🎯 实用数据库操作示例

用户管理系统

ruby
# 用户管理系统的数据库操作
class UserDatabase
  def initialize(database_path = "users.db")
    @db = SQLite3::Database.new(database_path)
    @db.results_as_hash = true
    
    create_tables
  end
  
  def create_tables
    @db.execute <<-SQL
      CREATE TABLE IF NOT EXISTS users (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        username VARCHAR(50) UNIQUE NOT NULL,
        email VARCHAR(100) UNIQUE NOT NULL,
        password_hash VARCHAR(255) NOT NULL,
        first_name VARCHAR(50),
        last_name VARCHAR(50),
        age INTEGER,
        status VARCHAR(20) DEFAULT 'active',
        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
        updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
      )
    SQL
    
    @db.execute <<-SQL
      CREATE TABLE IF NOT EXISTS user_sessions (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        user_id INTEGER NOT NULL,
        session_token VARCHAR(255) UNIQUE NOT NULL,
        expires_at TIMESTAMP NOT NULL,
        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
        FOREIGN KEY (user_id) REFERENCES users (id)
      )
    SQL
  end
  
  # 创建用户
  def create_user(username, email, password, first_name = nil, last_name = nil, age = nil)
    password_hash = BCrypt::Password.create(password)
    
    @db.execute(
      "INSERT INTO users (username, email, password_hash, first_name, last_name, age) VALUES (?, ?, ?, ?, ?, ?)",
      [username, email, password_hash, first_name, last_name, age]
    )
    
    @db.last_insert_row_id
  rescue SQLite3::ConstraintException => e
    raise "用户名或邮箱已存在" if e.message.include?("UNIQUE constraint failed")
    raise e
  end
  
  # 验证用户登录
  def authenticate_user(username, password)
    result = @db.execute("SELECT * FROM users WHERE username = ? AND status = 'active'", [username])
    return nil if result.empty?
    
    user = result.first
    if BCrypt::Password.new(user['password_hash']) == password
      user
    else
      nil
    end
  end
  
  # 查找用户
  def find_user(id)
    @db.execute("SELECT * FROM users WHERE id = ?", [id]).first
  end
  
  # 更新用户信息
  def update_user(id, updates)
    set_clause = updates.keys.map { |key| "#{key} = ?" }.join(", ")
    values = updates.values + [id]
    
    @db.execute("UPDATE users SET #{set_clause}, updated_at = CURRENT_TIMESTAMP WHERE id = ?", values)
  end
  
  # 删除用户(软删除)
  def delete_user(id)
    @db.execute("UPDATE users SET status = 'deleted', updated_at = CURRENT_TIMESTAMP WHERE id = ?", [id])
  end
  
  # 创建会话
  def create_session(user_id, expires_in_hours = 24)
    session_token = SecureRandom.urlsafe_base64
    expires_at = Time.now + (expires_in_hours * 3600)
    
    @db.execute(
      "INSERT INTO user_sessions (user_id, session_token, expires_at) VALUES (?, ?, ?)",
      [user_id, session_token, expires_at.iso8601]
    )
    
    session_token
  end
  
  # 验证会话
  def validate_session(session_token)
    result = @db.execute(
      "SELECT u.* FROM user_sessions s JOIN users u ON s.user_id = u.id WHERE s.session_token = ? AND s.expires_at > ?",
      [session_token, Time.now.iso8601]
    )
    
    result.first
  end
  
  # 删除会话
  def delete_session(session_token)
    @db.execute("DELETE FROM user_sessions WHERE session_token = ?", [session_token])
  end
  
  # 获取所有活跃用户
  def active_users
    @db.execute("SELECT * FROM users WHERE status = 'active' ORDER BY created_at DESC")
  end
  
  # 按条件搜索用户
  def search_users(keyword, limit = 10)
    @db.execute(
      "SELECT * FROM users WHERE status = 'active' AND (username LIKE ? OR email LIKE ? OR first_name LIKE ? OR last_name LIKE ?) ORDER BY created_at DESC LIMIT ?",
      ["%#{keyword}%", "%#{keyword}%", "%#{keyword}%", "%#{keyword}%", limit]
    )
  end
  
  def close
    @db.close
  end
end

# 使用用户数据库
# require 'bcrypt'
# require 'securerandom'
# 
# user_db = UserDatabase.new
# 
# # 创建用户
# user_id = user_db.create_user("zhangsan", "zhangsan@example.com", "password123", "张", "三", 25)
# puts "创建用户ID: #{user_id}"
# 
# # 用户登录
# user = user_db.authenticate_user("zhangsan", "password123")
# if user
#   puts "登录成功: #{user['username']}"
#   
#   # 创建会话
#   session_token = user_db.create_session(user['id'])
#   puts "会话令牌: #{session_token}"
#   
#   # 验证会话
#   session_user = user_db.validate_session(session_token)
#   puts "会话用户: #{session_user['username']}" if session_user
# else
#   puts "登录失败"
# end
# 
# # 搜索用户
# users = user_db.search_users("张")
# users.each { |u| puts "搜索结果: #{u['username']}" }
# 
# user_db.close

内容管理系统

ruby
# 内容管理系统的数据库操作
class ContentDatabase
  def initialize(database_path = "content.db")
    @db = SQLite3::Database.new(database_path)
    @db.results_as_hash = true
    
    create_tables
  end
  
  def create_tables
    @db.execute <<-SQL
      CREATE TABLE IF NOT EXISTS categories (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        name VARCHAR(100) UNIQUE NOT NULL,
        description TEXT,
        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
      )
    SQL
    
    @db.execute <<-SQL
      CREATE TABLE IF NOT EXISTS articles (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        title VARCHAR(255) NOT NULL,
        content TEXT NOT NULL,
        summary TEXT,
        author_id INTEGER NOT NULL,
        category_id INTEGER,
        status VARCHAR(20) DEFAULT 'draft',
        view_count INTEGER DEFAULT 0,
        published_at TIMESTAMP,
        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
        updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
        FOREIGN KEY (author_id) REFERENCES users (id),
        FOREIGN KEY (category_id) REFERENCES categories (id)
      )
    SQL
    
    @db.execute <<-SQL
      CREATE TABLE IF NOT EXISTS tags (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        name VARCHAR(50) UNIQUE NOT NULL,
        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
      )
    SQL
    
    @db.execute <<-SQL
      CREATE TABLE IF NOT EXISTS article_tags (
        article_id INTEGER,
        tag_id INTEGER,
        PRIMARY KEY (article_id, tag_id),
        FOREIGN KEY (article_id) REFERENCES articles (id) ON DELETE CASCADE,
        FOREIGN KEY (tag_id) REFERENCES tags (id) ON DELETE CASCADE
      )
    SQL
    
    @db.execute <<-SQL
      CREATE TABLE IF NOT EXISTS comments (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        article_id INTEGER NOT NULL,
        author_name VARCHAR(100) NOT NULL,
        author_email VARCHAR(100),
        content TEXT NOT NULL,
        status VARCHAR(20) DEFAULT 'pending',
        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
        FOREIGN KEY (article_id) REFERENCES articles (id) ON DELETE CASCADE
      )
    SQL
  end
  
  # 创建分类
  def create_category(name, description = nil)
    @db.execute("INSERT INTO categories (name, description) VALUES (?, ?)", [name, description])
    @db.last_insert_row_id
  rescue SQLite3::ConstraintException
    raise "分类名称已存在"
  end
  
  # 创建文章
  def create_article(title, content, author_id, category_id = nil, status = 'draft')
    summary = content.length > 200 ? content[0, 200] + "..." : content
    
    @db.execute(
      "INSERT INTO articles (title, content, summary, author_id, category_id, status) VALUES (?, ?, ?, ?, ?, ?)",
      [title, content, summary, author_id, category_id, status]
    )
    
    @db.last_insert_row_id
  end
  
  # 发布文章
  def publish_article(article_id)
    @db.execute(
      "UPDATE articles SET status = 'published', published_at = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP WHERE id = ?",
      [article_id]
    )
  end
  
  # 获取文章列表
  def list_articles(status = 'published', limit = 10, offset = 0)
    @db.execute(
      "SELECT a.*, c.name as category_name FROM articles a LEFT JOIN categories c ON a.category_id = c.id WHERE a.status = ? ORDER BY a.created_at DESC LIMIT ? OFFSET ?",
      [status, limit, offset]
    )
  end
  
  # 获取文章详情
  def get_article(article_id)
    result = @db.execute(
      "SELECT a.*, c.name as category_name FROM articles a LEFT JOIN categories c ON a.category_id = c.id WHERE a.id = ?",
      [article_id]
    ).first
    
    return nil unless result
    
    # 增加浏览次数
    @db.execute("UPDATE articles SET view_count = view_count + 1 WHERE id = ?", [article_id])
    
    # 获取标签
    tags = @db.execute(
      "SELECT t.name FROM tags t JOIN article_tags at ON t.id = at.tag_id WHERE at.article_id = ?",
      [article_id]
    ).map { |row| row['name'] }
    
    result['tags'] = tags
    result
  end
  
  # 搜索文章
  def search_articles(keyword, limit = 10)
    @db.execute(
      "SELECT a.*, c.name as category_name FROM articles a LEFT JOIN categories c ON a.category_id = c.id WHERE a.status = 'published' AND (a.title LIKE ? OR a.content LIKE ?) ORDER BY a.created_at DESC LIMIT ?",
      ["%#{keyword}%", "%#{keyword}%", limit]
    )
  end
  
  # 添加标签
  def add_tag_to_article(article_id, tag_name)
    # 创建标签(如果不存在)
    tag_result = @db.execute("SELECT id FROM tags WHERE name = ?", [tag_name]).first
    tag_id = if tag_result
               tag_result['id']
             else
               @db.execute("INSERT INTO tags (name) VALUES (?)", [tag_name])
               @db.last_insert_row_id
             end
    
    # 关联文章和标签
    @db.execute(
      "INSERT OR IGNORE INTO article_tags (article_id, tag_id) VALUES (?, ?)",
      [article_id, tag_id]
    )
  end
  
  # 创建评论
  def create_comment(article_id, author_name, author_email, content)
    @db.execute(
      "INSERT INTO comments (article_id, author_name, author_email, content) VALUES (?, ?, ?, ?)",
      [article_id, author_name, author_email, content]
    )
    
    @db.last_insert_row_id
  end
  
  # 获取文章评论
  def get_article_comments(article_id, status = 'approved')
    @db.execute(
      "SELECT * FROM comments WHERE article_id = ? AND status = ? ORDER BY created_at ASC",
      [article_id, status]
    )
  end
  
  # 审核评论
  def moderate_comment(comment_id, status)
    @db.execute("UPDATE comments SET status = ? WHERE id = ?", [status, comment_id])
  end
  
  # 获取热门文章
  def popular_articles(limit = 5)
    @db.execute(
      "SELECT * FROM articles WHERE status = 'published' ORDER BY view_count DESC LIMIT ?",
      [limit]
    )
  end
  
  # 获取分类文章
  def articles_by_category(category_id, limit = 10)
    @db.execute(
      "SELECT a.*, c.name as category_name FROM articles a JOIN categories c ON a.category_id = c.id WHERE a.category_id = ? AND a.status = 'published' ORDER BY a.created_at DESC LIMIT ?",
      [category_id, limit]
    )
  end
  
  def close
    @db.close
  end
end

# 使用内容数据库
# content_db = ContentDatabase.new
# 
# # 创建分类
# tech_category = content_db.create_category("技术", "技术相关文章")
# puts "创建分类ID: #{tech_category}"
# 
# # 创建文章
# article_id = content_db.create_article(
#   "Ruby数据库访问教程",
#   "这是关于Ruby数据库访问的详细教程...",
#   1,  # author_id
#   tech_category
# )
# puts "创建文章ID: #{article_id}"
# 
# # 发布文章
# content_db.publish_article(article_id)
# 
# # 添加标签
# content_db.add_tag_to_article(article_id, "Ruby")
# content_db.add_tag_to_article(article_id, "数据库")
# 
# # 获取文章
# article = content_db.get_article(article_id)
# puts "文章标题: #{article['title']}" if article
# puts "标签: #{article['tags']}" if article
# 
# # 搜索文章
# results = content_db.search_articles("数据库")
# results.each { |a| puts "搜索结果: #{a['title']}" }
# 
# content_db.close

📊 性能优化

查询优化

ruby
# 数据库查询优化技巧
class OptimizedQueries
  # 使用索引
  def self.create_indexes(db)
    db.execute("CREATE INDEX IF NOT EXISTS idx_users_email ON users(email)")
    db.execute("CREATE INDEX IF NOT EXISTS idx_users_status ON users(status)")
    db.execute("CREATE INDEX IF NOT EXISTS idx_articles_status ON articles(status)")
    db.execute("CREATE INDEX IF NOT EXISTS idx_articles_category ON articles(category_id)")
    db.execute("CREATE INDEX IF NOT EXISTS idx_articles_published ON articles(published_at)")
  end
  
  # 批量插入
  def self.bulk_insert_users(db, users_data)
    # 使用事务提高性能
    db.transaction do
      users_data.each do |user_data|
        db.execute(
          "INSERT INTO users (username, email, password_hash) VALUES (?, ?, ?)",
          user_data
        )
      end
    end
  end
  
  # 分页查询
  def self.paginated_articles(db, page = 1, per_page = 10)
    offset = (page - 1) * per_page
    db.execute(
      "SELECT * FROM articles WHERE status = 'published' ORDER BY published_at DESC LIMIT ? OFFSET ?",
      [per_page, offset]
    )
  end
  
  # 预加载关联数据
  def self.articles_with_authors(db, limit = 10)
    db.execute(
      "SELECT a.*, u.username as author_name FROM articles a JOIN users u ON a.author_id = u.id WHERE a.status = 'published' ORDER BY a.published_at DESC LIMIT ?",
      [limit]
    )
  end
  
  # 使用EXISTS替代IN
  def self.users_with_published_articles(db)
    db.execute(
      "SELECT * FROM users u WHERE EXISTS (SELECT 1 FROM articles a WHERE a.author_id = u.id AND a.status = 'published')"
    )
  end
  
  # 限制查询字段
  def self.user_listings(db)
    db.execute("SELECT id, username, email FROM users WHERE status = 'active'")
  end
end

# 连接池优化
class OptimizedDatabasePool
  def initialize(adapter, config, pool_size = 5)
    @adapter = adapter
    @config = config
    @pool_size = pool_size
    @pool = Queue.new
    @mutex = Mutex.new
    
    # 预创建连接
    @pool_size.times do
      @pool << create_connection
    end
  end
  
  def with_connection
    conn = acquire_connection
    begin
      yield conn
    ensure
      release_connection(conn)
    end
  end
  
  private
  
  def create_connection
    case @adapter
    when :sqlite3
      SQLite3::Database.new(@config[:database])
    when :mysql2
      Mysql2::Client.new(@config)
    when :postgresql
      PG.connect(@config)
    end
  end
  
  def acquire_connection
    @pool.pop(true)  # 非阻塞获取
  rescue ThreadError
    # 连接池耗尽时创建新连接
    create_connection
  end
  
  def release_connection(conn)
    if @pool.size < @pool_size
      @pool << conn
    else
      conn.close  # 关闭多余的连接
    end
  end
end

缓存策略

ruby
# 数据库查询缓存
require 'redis'

class QueryCache
  def initialize(redis_url = 'redis://localhost:6379')
    @redis = Redis.new(url: redis_url)
    @default_ttl = 3600  # 1小时
  end
  
  def fetch(key, ttl = nil)
    ttl ||= @default_ttl
    cached = @redis.get(key)
    
    if cached
      JSON.parse(cached)
    else
      result = yield
      @redis.setex(key, ttl, result.to_json)
      result
    end
  end
  
  def invalidate(key)
    @redis.del(key)
  end
  
  def invalidate_pattern(pattern)
    keys = @redis.keys(pattern)
    @redis.del(*keys) unless keys.empty?
  end
end

# 使用查询缓存
# cache = QueryCache.new
# 
# # 缓存用户信息
# user = cache.fetch("user:#{user_id}") do
#   db.execute("SELECT * FROM users WHERE id = ?", [user_id]).first
# end
# 
# # 缓存文章列表
# articles = cache.fetch("articles:page:#{page}:limit:#{limit}") do
#   db.execute("SELECT * FROM articles LIMIT ? OFFSET ?", [limit, (page-1)*limit])
# end
# 
# # 当数据更新时清除缓存
# cache.invalidate("user:#{user_id}")
# cache.invalidate_pattern("articles:*")

🛡️ 数据库安全最佳实践

1. 连接安全

ruby
# 安全的数据库连接配置
class SecureDatabaseConnection
  def self.sqlite3_config(database_path)
    {
      database: database_path,
      # 启用外键约束
      foreign_keys: true,
      # 启用WAL模式提高并发性能
      journal_mode: 'wal'
    }
  end
  
  def self.mysql_config
    {
      host: ENV['DB_HOST'] || 'localhost',
      username: ENV['DB_USERNAME'],
      password: ENV['DB_PASSWORD'],
      database: ENV['DB_NAME'],
      port: ENV['DB_PORT'] || 3306,
      # SSL连接
      ssl_mode: :require,
      # 连接池配置
      pool: 5,
      reconnect: true
    }
  end
  
  def self.postgresql_config
    {
      host: ENV['DB_HOST'] || 'localhost',
      user: ENV['DB_USERNAME'],
      password: ENV['DB_PASSWORD'],
      dbname: ENV['DB_NAME'],
      port: ENV['DB_PORT'] || 5432,
      # SSL连接
      sslmode: 'require',
      # 连接池配置
      max_connections: 5
    }
  end
end

# 环境变量配置示例
=begin
# .env文件
DB_HOST=localhost
DB_USERNAME=myapp
DB_PASSWORD=secure_password
DB_NAME=myapp_production
DB_PORT=5432
=end

2. 数据保护

ruby
# 敏感数据加密
require 'openssl'
require 'base64'

class DataEncryption
  def self.encrypt(data, key = ENV['ENCRYPTION_KEY'])
    cipher = OpenSSL::Cipher.new('AES-256-CBC')
    cipher.encrypt
    cipher.key = Digest::SHA256.digest(key)
    iv = cipher.random_iv
    encrypted = cipher.update(data) + cipher.final
    Base64.encode64(iv + encrypted)
  end
  
  def self.decrypt(encrypted_data, key = ENV['ENCRYPTION_KEY'])
    decoded = Base64.decode64(encrypted_data)
    iv = decoded[0, 16]
    encrypted = decoded[16..-1]
    
    cipher = OpenSSL::Cipher.new('AES-256-CBC')
    cipher.decrypt
    cipher.key = Digest::SHA256.digest(key)
    cipher.iv = iv
    cipher.update(encrypted) + cipher.final
  end
end

# 在模型中使用加密
class User < ActiveRecord::Base
  # 加密敏感字段
  def ssn=(value)
    write_attribute(:ssn, DataEncryption.encrypt(value))
  end
  
  def ssn
    encrypted = read_attribute(:ssn)
    DataEncryption.decrypt(encrypted) if encrypted
  end
end

# 数据脱敏
class DataMasking
  def self.mask_email(email)
    return email if email.nil?
    username, domain = email.split('@')
    masked_username = username[0] + '*' * (username.length - 2) + username[-1]
    "#{masked_username}@#{domain}"
  end
  
  def self.mask_phone(phone)
    return phone if phone.nil?
    phone.gsub(/(\d{3})\d{4}(\d{4})/, '\1****\2')
  end
  
  def self.mask_id_card(id_card)
    return id_card if id_card.nil?
    id_card.gsub(/(\d{6})\d{8}(\d{4})/, '\1********\2')
  end
end

# 使用数据脱敏
# puts DataMasking.mask_email("user@example.com")  # u**r@example.com
# puts DataMasking.mask_phone("13812345678")       # 138****5678
# puts DataMasking.mask_id_card("110101199001011234")  # 110101********1234

3. 访问控制

ruby
# 数据库访问控制
class DatabaseAccessControl
  # 数据库用户权限管理
  def self.setup_database_users
    # 创建应用用户(限制权限)
    sql = <<~SQL
      CREATE USER 'app_user'@'localhost' IDENTIFIED BY 'strong_password';
      GRANT SELECT, INSERT, UPDATE, DELETE ON myapp.* TO 'app_user'@'localhost';
      FLUSH PRIVILEGES;
    SQL
    
    # 执行SQL(需要管理员权限)
  end
  
  # 查询日志记录
  class QueryLogger
    def initialize(log_file = 'db_queries.log')
      @logger = Logger.new(log_file)
    end
    
    def log_query(sql, params = [], user_id = nil, ip_address = nil)
      @logger.info({
        timestamp: Time.now,
        sql: sql,
        params: params,
        user_id: user_id,
        ip_address: ip_address
      }.to_json)
    end
  end
  
  # 慢查询监控
  class SlowQueryMonitor
    def initialize(threshold = 1.0)  # 1秒阈值
      @threshold = threshold
      @logger = Logger.new('slow_queries.log')
    end
    
    def monitor_query(sql, params = [])
      start_time = Time.now
      result = yield  # 执行查询
      end_time = Time.now
      
      duration = end_time - start_time
      if duration > @threshold
        @logger.warn({
          sql: sql,
          params: params,
          duration: duration,
          timestamp: start_time
        }.to_json)
      end
      
      result
    end
  end
end

📚 下一步学习

掌握了Ruby数据库访问后,建议继续学习:

继续您的Ruby学习之旅吧!

本站内容仅供学习和研究使用。