Ruby 数据库访问
数据库是现代应用程序的核心组件,几乎所有的应用程序都需要与数据库进行交互来存储和检索数据。Ruby提供了多种方式来访问数据库,从原生的数据库驱动到高级的ORM框架。本章将详细介绍Ruby中数据库访问的各种方法和最佳实践。
🎯 数据库访问基础
数据库驱动程序
Ruby通过数据库驱动程序与各种数据库系统进行通信。每个数据库系统都有对应的Ruby驱动程序。
ruby
# SQLite数据库访问
# 首先安装sqlite3 gem: gem install sqlite3
require 'sqlite3'
# 连接数据库(如果不存在会自动创建)
db = SQLite3::Database.new("example.db")
# 执行SQL语句
db.execute <<-SQL
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name VARCHAR(100) NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL,
age INTEGER,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
SQL
# 插入数据
db.execute("INSERT INTO users (name, email, age) VALUES (?, ?, ?)",
["张三", "zhangsan@example.com", 25])
# 查询数据
results = db.execute("SELECT * FROM users")
results.each do |row|
puts "ID: #{row[0]}, 姓名: #{row[1]}, 邮箱: #{row[2]}, 年龄: #{row[3]}"
end
# 使用哈希模式获取结果
db.results_as_hash = true
results = db.execute("SELECT * FROM users")
results.each do |row|
puts "ID: #{row['id']}, 姓名: #{row['name']}, 邮箱: #{row['email']}"
end
# 关闭数据库连接
db.closeMySQL数据库访问
ruby
# MySQL数据库访问
# 首先安装mysql2 gem: gem install mysql2
require 'mysql2'
# 连接MySQL数据库
client = Mysql2::Client.new(
host: "localhost",
username: "root",
password: "password",
database: "myapp_development"
)
# 创建表
client.query <<-SQL
CREATE TABLE IF NOT EXISTS products (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
price DECIMAL(10, 2) NOT NULL,
category VARCHAR(100),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
SQL
# 插入数据
client.query("INSERT INTO products (name, price, category) VALUES ('苹果', 5.50, '水果')")
# 使用参数化查询防止SQL注入
client.query("INSERT INTO products (name, price, category) VALUES (?, ?, ?)",
["香蕉", 3.00, "水果"])
# 查询数据
results = client.query("SELECT * FROM products WHERE price > ?", [4.00])
results.each do |row|
puts "产品: #{row['name']}, 价格: #{row['price']}, 分类: #{row['category']}"
end
# 更新数据
client.query("UPDATE products SET price = ? WHERE name = ?", [6.00, "苹果"])
# 删除数据
client.query("DELETE FROM products WHERE name = ?", ["香蕉"])
# 获取查询信息
puts "受影响的行数: #{client.affected_rows}"
puts "最后插入的ID: #{client.last_id}"
# 关闭连接
client.closePostgreSQL数据库访问
ruby
# PostgreSQL数据库访问
# 首先安装pg gem: gem install pg
require 'pg'
# 连接PostgreSQL数据库
conn = PG.connect(
host: "localhost",
port: 5432,
dbname: "myapp_development",
user: "postgres",
password: "password"
)
# 创建表
conn.exec <<-SQL
CREATE TABLE IF NOT EXISTS employees (
id SERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL,
department VARCHAR(100),
salary NUMERIC(10, 2),
hire_date DATE DEFAULT CURRENT_DATE
)
SQL
# 插入数据
conn.exec_params("INSERT INTO employees (name, department, salary) VALUES ($1, $2, $3)",
["张三", "技术部", 15000.00])
# 查询数据
result = conn.exec_params("SELECT * FROM employees WHERE salary > $1", [10000])
result.each do |row|
puts "员工: #{row['name']}, 部门: #{row['department']}, 薪资: #{row['salary']}"
end
# 更新数据
conn.exec_params("UPDATE employees SET salary = $1 WHERE name = $2", [16000.00, "张三"])
# 删除数据
conn.exec_params("DELETE FROM employees WHERE name = $1", ["张三"])
# 使用事务
conn.transaction do |txn|
txn.exec_params("INSERT INTO employees (name, department, salary) VALUES ($1, $2, $3)",
["李四", "销售部", 12000.00])
txn.exec_params("INSERT INTO employees (name, department, salary) VALUES ($1, $2, $3)",
["王五", "人事部", 11000.00])
end
# 关闭连接
conn.close🏗️ 数据库连接管理
连接池
ruby
# 使用连接池管理数据库连接
require 'sqlite3'
require 'thread'
class DatabasePool
def initialize(database_path, pool_size = 5)
@database_path = database_path
@pool_size = pool_size
@pool = Queue.new
@mutex = Mutex.new
# 初始化连接池
@pool_size.times do
@pool << create_connection
end
end
def execute(sql, params = [])
conn = acquire_connection
begin
conn.execute(sql, params)
ensure
release_connection(conn)
end
end
def query(sql, params = [])
conn = acquire_connection
begin
conn.query(sql, params)
ensure
release_connection(conn)
end
end
private
def create_connection
SQLite3::Database.new(@database_path)
end
def acquire_connection
@pool.pop
end
def release_connection(conn)
@pool << conn
end
end
# 使用连接池
# pool = DatabasePool.new("example.db")
# pool.execute("INSERT INTO users (name, email) VALUES (?, ?)", ["测试用户", "test@example.com"])数据库配置管理
ruby
# 数据库配置管理
require 'yaml'
require 'erb'
class DatabaseConfig
def self.load(config_file = 'config/database.yml')
erb = ERB.new(File.read(config_file)).result
YAML.load(erb)
end
def self.connect(environment = 'development')
config = load
env_config = config[environment]
case env_config['adapter']
when 'sqlite3'
require 'sqlite3'
SQLite3::Database.new(env_config['database'])
when 'mysql2'
require 'mysql2'
Mysql2::Client.new(
host: env_config['host'],
username: env_config['username'],
password: env_config['password'],
database: env_config['database'],
port: env_config['port']
)
when 'postgresql'
require 'pg'
PG.connect(
host: env_config['host'],
port: env_config['port'],
dbname: env_config['database'],
user: env_config['username'],
password: env_config['password']
)
end
end
end
# database.yml 配置文件示例
=begin
development:
adapter: sqlite3
database: db/development.sqlite3
test:
adapter: sqlite3
database: db/test.sqlite3
production:
adapter: postgresql
host: localhost
port: 5432
database: myapp_production
username: myapp
password: <%= ENV['DATABASE_PASSWORD'] %>
=end🎯 ORM框架 - ActiveRecord
ActiveRecord基础
ruby
# ActiveRecord是Ruby on Rails的ORM框架
# 首先安装activerecord gem: gem install activerecord
require 'active_record'
require 'sqlite3'
# 配置数据库连接
ActiveRecord::Base.establish_connection(
adapter: 'sqlite3',
database: 'example.db'
)
# 定义模型
class User < ActiveRecord::Base
end
class Post < ActiveRecord::Base
belongs_to :user
end
# 创建表(使用迁移)
class CreateUsers < ActiveRecord::Migration[7.0]
def change
create_table :users do |t|
t.string :name, null: false
t.string :email, null: false
t.integer :age
t.timestamps
end
add_index :users, :email, unique: true
end
end
class CreatePosts < ActiveRecord::Migration[7.0]
def change
create_table :posts do |t|
t.string :title, null: false
t.text :content
t.references :user, null: false, foreign_key: true
t.timestamps
end
add_index :posts, :user_id
end
end
# 运行迁移
# CreateUsers.new.change
# CreatePosts.new.change
# CRUD操作
# 创建记录
user = User.create(name: "张三", email: "zhangsan@example.com", age: 25)
puts "创建用户: #{user.name}"
# 查询记录
all_users = User.all
puts "所有用户数: #{all_users.count}"
user_by_id = User.find(1)
puts "用户: #{user_by_id.name}"
user_by_email = User.find_by(email: "zhangsan@example.com")
puts "邮箱用户: #{user_by_email.name}"
# 条件查询
young_users = User.where("age < ?", 30)
young_users.each { |user| puts "年轻用户: #{user.name}" }
# 更新记录
user.update(name: "张三丰", age: 30)
puts "更新后用户: #{user.name}, 年龄: #{user.age}"
# 删除记录
# user.destroy
# 关联查询
post = Post.create(title: "第一篇文章", content: "这是内容", user: user)
user_posts = user.posts
puts "用户文章数: #{user_posts.count}"ActiveRecord高级特性
ruby
# 高级查询
class User < ActiveRecord::Base
has_many :posts
has_many :comments
scope :adults, -> { where("age >= ?", 18) }
scope :recent, -> { order(created_at: :desc) }
validates :name, presence: true
validates :email, presence: true, uniqueness: true
validates :age, numericality: { greater_than: 0 }
end
class Post < ActiveRecord::Base
belongs_to :user
has_many :comments
scope :published, -> { where.not(published_at: nil) }
validates :title, presence: true
validates :user, presence: true
end
class Comment < ActiveRecord::Base
belongs_to :user
belongs_to :post
validates :content, presence: true
end
# 范围查询
adult_users = User.adults
recent_users = User.recent.limit(10)
# 关联查询
user = User.includes(:posts).find(1)
user.posts.each { |post| puts post.title }
# 连接查询
posts_with_users = Post.joins(:user).where(users: { age: 25 })
posts_with_users.each { |post| puts "#{post.user.name}: #{post.title}" }
# 聚合查询
total_users = User.count
average_age = User.average(:age)
max_age = User.maximum(:age)
min_age = User.minimum(:age)
# 分组查询
users_by_age = User.group(:age).count
puts users_by_age
# 原生SQL查询
users = User.find_by_sql("SELECT * FROM users WHERE age > ?", [20])
# 事务处理
ActiveRecord::Base.transaction do
user = User.create!(name: "新用户", email: "newuser@example.com", age: 25)
post = user.posts.create!(title: "新文章", content: "内容")
# 如果任何操作失败,整个事务都会回滚
end
# 回调
class User < ActiveRecord::Base
before_create :set_default_age
after_create :send_welcome_email
before_update :log_changes
private
def set_default_age
self.age ||= 18
end
def send_welcome_email
# 发送欢迎邮件
puts "发送欢迎邮件给 #{email}"
end
def log_changes
puts "用户 #{name} 的信息已更新"
end
end🔧 数据库操作最佳实践
数据验证和安全
ruby
# 模型验证
class User < ActiveRecord::Base
# 基本验证
validates :name, presence: true, length: { minimum: 2, maximum: 50 }
validates :email, presence: true, uniqueness: { case_sensitive: false }
validates :age, presence: true, numericality: {
only_integer: true,
greater_than: 0,
less_than: 150
}
# 自定义验证
validate :email_format
validate :name_not_reserved
# 格式验证
validates :email, format: {
with: /\A[\w+\-.]+@[a-z\d\-]+(\.[a-z\d\-]+)*\.[a-z]+\z/i,
message: "邮箱格式不正确"
}
# 条件验证
validates :phone, presence: true, if: :phone_required?
# 排除验证
validates_exclusion :name, in: %w(admin administrator root)
# 包含验证
validates_inclusion :status, in: %w(active inactive suspended)
private
def email_format
unless email.match?(/\A[\w+\-.]+@[a-z\d\-]+(\.[a-z\d\-]+)*\.[a-z]+\z/i)
errors.add(:email, "格式不正确")
end
end
def name_not_reserved
reserved_names = %w(admin root user guest)
if reserved_names.include?(name&.downcase)
errors.add(:name, "不能使用保留名称")
end
end
def phone_required?
status == 'active'
end
end
# 使用验证
user = User.new(name: "a", email: "invalid-email", age: -5)
unless user.valid?
puts "验证错误:"
user.errors.full_messages.each { |msg| puts " #{msg}" }
endSQL注入防护
ruby
# 安全的数据库操作
class UserService
# 使用参数化查询
def self.find_by_email_safe(email)
User.where("email = ?", email)
end
# 使用哈希条件
def self.find_by_conditions(conditions)
User.where(conditions)
end
# 安全的原生SQL
def self.search_users_safe(keyword)
sanitized_keyword = User.sanitize_sql_like(keyword)
User.where("name LIKE ?", "%#{sanitized_keyword}%")
end
# 避免危险的字符串插值
# 危险的做法
# def self.find_by_email_dangerous(email)
# User.where("email = '#{email}'") # 容易受到SQL注入攻击
# end
# 批量操作
def self.bulk_insert_safe(users_data)
User.insert_all(users_data, record_timestamps: true)
end
# 安全的动态查询
def self.dynamic_search(filters)
scope = User.all
filters.each do |key, value|
case key.to_sym
when :name
scope = scope.where("name LIKE ?", "%#{User.sanitize_sql_like(value)}%")
when :min_age
scope = scope.where("age >= ?", value)
when :max_age
scope = scope.where("age <= ?", value)
when :status
scope = scope.where(status: value)
end
end
scope
end
end
# 使用安全的查询方法
users = UserService.find_by_email_safe("user@example.com")
users = UserService.find_by_conditions(name: "张三", age: 25)
users = UserService.search_users_safe("张")
users = UserService.dynamic_search(name: "张", min_age: 18, status: "active")🎯 实用数据库操作示例
用户管理系统
ruby
# 用户管理系统的数据库操作
class UserDatabase
def initialize(database_path = "users.db")
@db = SQLite3::Database.new(database_path)
@db.results_as_hash = true
create_tables
end
def create_tables
@db.execute <<-SQL
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username VARCHAR(50) UNIQUE NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL,
password_hash VARCHAR(255) NOT NULL,
first_name VARCHAR(50),
last_name VARCHAR(50),
age INTEGER,
status VARCHAR(20) DEFAULT 'active',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
SQL
@db.execute <<-SQL
CREATE TABLE IF NOT EXISTS user_sessions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
session_token VARCHAR(255) UNIQUE NOT NULL,
expires_at TIMESTAMP NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users (id)
)
SQL
end
# 创建用户
def create_user(username, email, password, first_name = nil, last_name = nil, age = nil)
password_hash = BCrypt::Password.create(password)
@db.execute(
"INSERT INTO users (username, email, password_hash, first_name, last_name, age) VALUES (?, ?, ?, ?, ?, ?)",
[username, email, password_hash, first_name, last_name, age]
)
@db.last_insert_row_id
rescue SQLite3::ConstraintException => e
raise "用户名或邮箱已存在" if e.message.include?("UNIQUE constraint failed")
raise e
end
# 验证用户登录
def authenticate_user(username, password)
result = @db.execute("SELECT * FROM users WHERE username = ? AND status = 'active'", [username])
return nil if result.empty?
user = result.first
if BCrypt::Password.new(user['password_hash']) == password
user
else
nil
end
end
# 查找用户
def find_user(id)
@db.execute("SELECT * FROM users WHERE id = ?", [id]).first
end
# 更新用户信息
def update_user(id, updates)
set_clause = updates.keys.map { |key| "#{key} = ?" }.join(", ")
values = updates.values + [id]
@db.execute("UPDATE users SET #{set_clause}, updated_at = CURRENT_TIMESTAMP WHERE id = ?", values)
end
# 删除用户(软删除)
def delete_user(id)
@db.execute("UPDATE users SET status = 'deleted', updated_at = CURRENT_TIMESTAMP WHERE id = ?", [id])
end
# 创建会话
def create_session(user_id, expires_in_hours = 24)
session_token = SecureRandom.urlsafe_base64
expires_at = Time.now + (expires_in_hours * 3600)
@db.execute(
"INSERT INTO user_sessions (user_id, session_token, expires_at) VALUES (?, ?, ?)",
[user_id, session_token, expires_at.iso8601]
)
session_token
end
# 验证会话
def validate_session(session_token)
result = @db.execute(
"SELECT u.* FROM user_sessions s JOIN users u ON s.user_id = u.id WHERE s.session_token = ? AND s.expires_at > ?",
[session_token, Time.now.iso8601]
)
result.first
end
# 删除会话
def delete_session(session_token)
@db.execute("DELETE FROM user_sessions WHERE session_token = ?", [session_token])
end
# 获取所有活跃用户
def active_users
@db.execute("SELECT * FROM users WHERE status = 'active' ORDER BY created_at DESC")
end
# 按条件搜索用户
def search_users(keyword, limit = 10)
@db.execute(
"SELECT * FROM users WHERE status = 'active' AND (username LIKE ? OR email LIKE ? OR first_name LIKE ? OR last_name LIKE ?) ORDER BY created_at DESC LIMIT ?",
["%#{keyword}%", "%#{keyword}%", "%#{keyword}%", "%#{keyword}%", limit]
)
end
def close
@db.close
end
end
# 使用用户数据库
# require 'bcrypt'
# require 'securerandom'
#
# user_db = UserDatabase.new
#
# # 创建用户
# user_id = user_db.create_user("zhangsan", "zhangsan@example.com", "password123", "张", "三", 25)
# puts "创建用户ID: #{user_id}"
#
# # 用户登录
# user = user_db.authenticate_user("zhangsan", "password123")
# if user
# puts "登录成功: #{user['username']}"
#
# # 创建会话
# session_token = user_db.create_session(user['id'])
# puts "会话令牌: #{session_token}"
#
# # 验证会话
# session_user = user_db.validate_session(session_token)
# puts "会话用户: #{session_user['username']}" if session_user
# else
# puts "登录失败"
# end
#
# # 搜索用户
# users = user_db.search_users("张")
# users.each { |u| puts "搜索结果: #{u['username']}" }
#
# user_db.close内容管理系统
ruby
# 内容管理系统的数据库操作
class ContentDatabase
def initialize(database_path = "content.db")
@db = SQLite3::Database.new(database_path)
@db.results_as_hash = true
create_tables
end
def create_tables
@db.execute <<-SQL
CREATE TABLE IF NOT EXISTS categories (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name VARCHAR(100) UNIQUE NOT NULL,
description TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
SQL
@db.execute <<-SQL
CREATE TABLE IF NOT EXISTS articles (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title VARCHAR(255) NOT NULL,
content TEXT NOT NULL,
summary TEXT,
author_id INTEGER NOT NULL,
category_id INTEGER,
status VARCHAR(20) DEFAULT 'draft',
view_count INTEGER DEFAULT 0,
published_at TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (author_id) REFERENCES users (id),
FOREIGN KEY (category_id) REFERENCES categories (id)
)
SQL
@db.execute <<-SQL
CREATE TABLE IF NOT EXISTS tags (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name VARCHAR(50) UNIQUE NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
SQL
@db.execute <<-SQL
CREATE TABLE IF NOT EXISTS article_tags (
article_id INTEGER,
tag_id INTEGER,
PRIMARY KEY (article_id, tag_id),
FOREIGN KEY (article_id) REFERENCES articles (id) ON DELETE CASCADE,
FOREIGN KEY (tag_id) REFERENCES tags (id) ON DELETE CASCADE
)
SQL
@db.execute <<-SQL
CREATE TABLE IF NOT EXISTS comments (
id INTEGER PRIMARY KEY AUTOINCREMENT,
article_id INTEGER NOT NULL,
author_name VARCHAR(100) NOT NULL,
author_email VARCHAR(100),
content TEXT NOT NULL,
status VARCHAR(20) DEFAULT 'pending',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (article_id) REFERENCES articles (id) ON DELETE CASCADE
)
SQL
end
# 创建分类
def create_category(name, description = nil)
@db.execute("INSERT INTO categories (name, description) VALUES (?, ?)", [name, description])
@db.last_insert_row_id
rescue SQLite3::ConstraintException
raise "分类名称已存在"
end
# 创建文章
def create_article(title, content, author_id, category_id = nil, status = 'draft')
summary = content.length > 200 ? content[0, 200] + "..." : content
@db.execute(
"INSERT INTO articles (title, content, summary, author_id, category_id, status) VALUES (?, ?, ?, ?, ?, ?)",
[title, content, summary, author_id, category_id, status]
)
@db.last_insert_row_id
end
# 发布文章
def publish_article(article_id)
@db.execute(
"UPDATE articles SET status = 'published', published_at = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP WHERE id = ?",
[article_id]
)
end
# 获取文章列表
def list_articles(status = 'published', limit = 10, offset = 0)
@db.execute(
"SELECT a.*, c.name as category_name FROM articles a LEFT JOIN categories c ON a.category_id = c.id WHERE a.status = ? ORDER BY a.created_at DESC LIMIT ? OFFSET ?",
[status, limit, offset]
)
end
# 获取文章详情
def get_article(article_id)
result = @db.execute(
"SELECT a.*, c.name as category_name FROM articles a LEFT JOIN categories c ON a.category_id = c.id WHERE a.id = ?",
[article_id]
).first
return nil unless result
# 增加浏览次数
@db.execute("UPDATE articles SET view_count = view_count + 1 WHERE id = ?", [article_id])
# 获取标签
tags = @db.execute(
"SELECT t.name FROM tags t JOIN article_tags at ON t.id = at.tag_id WHERE at.article_id = ?",
[article_id]
).map { |row| row['name'] }
result['tags'] = tags
result
end
# 搜索文章
def search_articles(keyword, limit = 10)
@db.execute(
"SELECT a.*, c.name as category_name FROM articles a LEFT JOIN categories c ON a.category_id = c.id WHERE a.status = 'published' AND (a.title LIKE ? OR a.content LIKE ?) ORDER BY a.created_at DESC LIMIT ?",
["%#{keyword}%", "%#{keyword}%", limit]
)
end
# 添加标签
def add_tag_to_article(article_id, tag_name)
# 创建标签(如果不存在)
tag_result = @db.execute("SELECT id FROM tags WHERE name = ?", [tag_name]).first
tag_id = if tag_result
tag_result['id']
else
@db.execute("INSERT INTO tags (name) VALUES (?)", [tag_name])
@db.last_insert_row_id
end
# 关联文章和标签
@db.execute(
"INSERT OR IGNORE INTO article_tags (article_id, tag_id) VALUES (?, ?)",
[article_id, tag_id]
)
end
# 创建评论
def create_comment(article_id, author_name, author_email, content)
@db.execute(
"INSERT INTO comments (article_id, author_name, author_email, content) VALUES (?, ?, ?, ?)",
[article_id, author_name, author_email, content]
)
@db.last_insert_row_id
end
# 获取文章评论
def get_article_comments(article_id, status = 'approved')
@db.execute(
"SELECT * FROM comments WHERE article_id = ? AND status = ? ORDER BY created_at ASC",
[article_id, status]
)
end
# 审核评论
def moderate_comment(comment_id, status)
@db.execute("UPDATE comments SET status = ? WHERE id = ?", [status, comment_id])
end
# 获取热门文章
def popular_articles(limit = 5)
@db.execute(
"SELECT * FROM articles WHERE status = 'published' ORDER BY view_count DESC LIMIT ?",
[limit]
)
end
# 获取分类文章
def articles_by_category(category_id, limit = 10)
@db.execute(
"SELECT a.*, c.name as category_name FROM articles a JOIN categories c ON a.category_id = c.id WHERE a.category_id = ? AND a.status = 'published' ORDER BY a.created_at DESC LIMIT ?",
[category_id, limit]
)
end
def close
@db.close
end
end
# 使用内容数据库
# content_db = ContentDatabase.new
#
# # 创建分类
# tech_category = content_db.create_category("技术", "技术相关文章")
# puts "创建分类ID: #{tech_category}"
#
# # 创建文章
# article_id = content_db.create_article(
# "Ruby数据库访问教程",
# "这是关于Ruby数据库访问的详细教程...",
# 1, # author_id
# tech_category
# )
# puts "创建文章ID: #{article_id}"
#
# # 发布文章
# content_db.publish_article(article_id)
#
# # 添加标签
# content_db.add_tag_to_article(article_id, "Ruby")
# content_db.add_tag_to_article(article_id, "数据库")
#
# # 获取文章
# article = content_db.get_article(article_id)
# puts "文章标题: #{article['title']}" if article
# puts "标签: #{article['tags']}" if article
#
# # 搜索文章
# results = content_db.search_articles("数据库")
# results.each { |a| puts "搜索结果: #{a['title']}" }
#
# content_db.close📊 性能优化
查询优化
ruby
# 数据库查询优化技巧
class OptimizedQueries
# 使用索引
def self.create_indexes(db)
db.execute("CREATE INDEX IF NOT EXISTS idx_users_email ON users(email)")
db.execute("CREATE INDEX IF NOT EXISTS idx_users_status ON users(status)")
db.execute("CREATE INDEX IF NOT EXISTS idx_articles_status ON articles(status)")
db.execute("CREATE INDEX IF NOT EXISTS idx_articles_category ON articles(category_id)")
db.execute("CREATE INDEX IF NOT EXISTS idx_articles_published ON articles(published_at)")
end
# 批量插入
def self.bulk_insert_users(db, users_data)
# 使用事务提高性能
db.transaction do
users_data.each do |user_data|
db.execute(
"INSERT INTO users (username, email, password_hash) VALUES (?, ?, ?)",
user_data
)
end
end
end
# 分页查询
def self.paginated_articles(db, page = 1, per_page = 10)
offset = (page - 1) * per_page
db.execute(
"SELECT * FROM articles WHERE status = 'published' ORDER BY published_at DESC LIMIT ? OFFSET ?",
[per_page, offset]
)
end
# 预加载关联数据
def self.articles_with_authors(db, limit = 10)
db.execute(
"SELECT a.*, u.username as author_name FROM articles a JOIN users u ON a.author_id = u.id WHERE a.status = 'published' ORDER BY a.published_at DESC LIMIT ?",
[limit]
)
end
# 使用EXISTS替代IN
def self.users_with_published_articles(db)
db.execute(
"SELECT * FROM users u WHERE EXISTS (SELECT 1 FROM articles a WHERE a.author_id = u.id AND a.status = 'published')"
)
end
# 限制查询字段
def self.user_listings(db)
db.execute("SELECT id, username, email FROM users WHERE status = 'active'")
end
end
# 连接池优化
class OptimizedDatabasePool
def initialize(adapter, config, pool_size = 5)
@adapter = adapter
@config = config
@pool_size = pool_size
@pool = Queue.new
@mutex = Mutex.new
# 预创建连接
@pool_size.times do
@pool << create_connection
end
end
def with_connection
conn = acquire_connection
begin
yield conn
ensure
release_connection(conn)
end
end
private
def create_connection
case @adapter
when :sqlite3
SQLite3::Database.new(@config[:database])
when :mysql2
Mysql2::Client.new(@config)
when :postgresql
PG.connect(@config)
end
end
def acquire_connection
@pool.pop(true) # 非阻塞获取
rescue ThreadError
# 连接池耗尽时创建新连接
create_connection
end
def release_connection(conn)
if @pool.size < @pool_size
@pool << conn
else
conn.close # 关闭多余的连接
end
end
end缓存策略
ruby
# 数据库查询缓存
require 'redis'
class QueryCache
def initialize(redis_url = 'redis://localhost:6379')
@redis = Redis.new(url: redis_url)
@default_ttl = 3600 # 1小时
end
def fetch(key, ttl = nil)
ttl ||= @default_ttl
cached = @redis.get(key)
if cached
JSON.parse(cached)
else
result = yield
@redis.setex(key, ttl, result.to_json)
result
end
end
def invalidate(key)
@redis.del(key)
end
def invalidate_pattern(pattern)
keys = @redis.keys(pattern)
@redis.del(*keys) unless keys.empty?
end
end
# 使用查询缓存
# cache = QueryCache.new
#
# # 缓存用户信息
# user = cache.fetch("user:#{user_id}") do
# db.execute("SELECT * FROM users WHERE id = ?", [user_id]).first
# end
#
# # 缓存文章列表
# articles = cache.fetch("articles:page:#{page}:limit:#{limit}") do
# db.execute("SELECT * FROM articles LIMIT ? OFFSET ?", [limit, (page-1)*limit])
# end
#
# # 当数据更新时清除缓存
# cache.invalidate("user:#{user_id}")
# cache.invalidate_pattern("articles:*")🛡️ 数据库安全最佳实践
1. 连接安全
ruby
# 安全的数据库连接配置
class SecureDatabaseConnection
def self.sqlite3_config(database_path)
{
database: database_path,
# 启用外键约束
foreign_keys: true,
# 启用WAL模式提高并发性能
journal_mode: 'wal'
}
end
def self.mysql_config
{
host: ENV['DB_HOST'] || 'localhost',
username: ENV['DB_USERNAME'],
password: ENV['DB_PASSWORD'],
database: ENV['DB_NAME'],
port: ENV['DB_PORT'] || 3306,
# SSL连接
ssl_mode: :require,
# 连接池配置
pool: 5,
reconnect: true
}
end
def self.postgresql_config
{
host: ENV['DB_HOST'] || 'localhost',
user: ENV['DB_USERNAME'],
password: ENV['DB_PASSWORD'],
dbname: ENV['DB_NAME'],
port: ENV['DB_PORT'] || 5432,
# SSL连接
sslmode: 'require',
# 连接池配置
max_connections: 5
}
end
end
# 环境变量配置示例
=begin
# .env文件
DB_HOST=localhost
DB_USERNAME=myapp
DB_PASSWORD=secure_password
DB_NAME=myapp_production
DB_PORT=5432
=end2. 数据保护
ruby
# 敏感数据加密
require 'openssl'
require 'base64'
class DataEncryption
def self.encrypt(data, key = ENV['ENCRYPTION_KEY'])
cipher = OpenSSL::Cipher.new('AES-256-CBC')
cipher.encrypt
cipher.key = Digest::SHA256.digest(key)
iv = cipher.random_iv
encrypted = cipher.update(data) + cipher.final
Base64.encode64(iv + encrypted)
end
def self.decrypt(encrypted_data, key = ENV['ENCRYPTION_KEY'])
decoded = Base64.decode64(encrypted_data)
iv = decoded[0, 16]
encrypted = decoded[16..-1]
cipher = OpenSSL::Cipher.new('AES-256-CBC')
cipher.decrypt
cipher.key = Digest::SHA256.digest(key)
cipher.iv = iv
cipher.update(encrypted) + cipher.final
end
end
# 在模型中使用加密
class User < ActiveRecord::Base
# 加密敏感字段
def ssn=(value)
write_attribute(:ssn, DataEncryption.encrypt(value))
end
def ssn
encrypted = read_attribute(:ssn)
DataEncryption.decrypt(encrypted) if encrypted
end
end
# 数据脱敏
class DataMasking
def self.mask_email(email)
return email if email.nil?
username, domain = email.split('@')
masked_username = username[0] + '*' * (username.length - 2) + username[-1]
"#{masked_username}@#{domain}"
end
def self.mask_phone(phone)
return phone if phone.nil?
phone.gsub(/(\d{3})\d{4}(\d{4})/, '\1****\2')
end
def self.mask_id_card(id_card)
return id_card if id_card.nil?
id_card.gsub(/(\d{6})\d{8}(\d{4})/, '\1********\2')
end
end
# 使用数据脱敏
# puts DataMasking.mask_email("user@example.com") # u**r@example.com
# puts DataMasking.mask_phone("13812345678") # 138****5678
# puts DataMasking.mask_id_card("110101199001011234") # 110101********12343. 访问控制
ruby
# 数据库访问控制
class DatabaseAccessControl
# 数据库用户权限管理
def self.setup_database_users
# 创建应用用户(限制权限)
sql = <<~SQL
CREATE USER 'app_user'@'localhost' IDENTIFIED BY 'strong_password';
GRANT SELECT, INSERT, UPDATE, DELETE ON myapp.* TO 'app_user'@'localhost';
FLUSH PRIVILEGES;
SQL
# 执行SQL(需要管理员权限)
end
# 查询日志记录
class QueryLogger
def initialize(log_file = 'db_queries.log')
@logger = Logger.new(log_file)
end
def log_query(sql, params = [], user_id = nil, ip_address = nil)
@logger.info({
timestamp: Time.now,
sql: sql,
params: params,
user_id: user_id,
ip_address: ip_address
}.to_json)
end
end
# 慢查询监控
class SlowQueryMonitor
def initialize(threshold = 1.0) # 1秒阈值
@threshold = threshold
@logger = Logger.new('slow_queries.log')
end
def monitor_query(sql, params = [])
start_time = Time.now
result = yield # 执行查询
end_time = Time.now
duration = end_time - start_time
if duration > @threshold
@logger.warn({
sql: sql,
params: params,
duration: duration,
timestamp: start_time
}.to_json)
end
result
end
end
end📚 下一步学习
掌握了Ruby数据库访问后,建议继续学习:
- Ruby CGI 编程 - 学习CGI应用程序开发
- Ruby 发送邮件 - SMTP - 掌握邮件发送功能
- Ruby Socket 编程 - 了解网络编程
- Ruby Web服务 - 学习Web服务开发
继续您的Ruby学习之旅吧!