数据库连接
问题
Node.js 如何连接数据库?什么是连接池?ORM 和原生查询有什么区别?
答案
Node.js 可以连接各种关系型和非关系型数据库。使用连接池复用连接提高性能,ORM 提供对象映射简化操作。
MySQL 连接
原生驱动 mysql2
import mysql from 'mysql2/promise';
// 创建连接池
const pool = mysql.createPool({
host: 'localhost',
port: 3306,
user: 'root',
password: 'password',
database: 'mydb',
waitForConnections: true,
connectionLimit: 10, // 最大连接数
queueLimit: 0, // 排队无限制
enableKeepAlive: true,
keepAliveInitialDelay: 0
});
// 查询
async function getUsers() {
const [rows] = await pool.query('SELECT * FROM users');
return rows;
}
// 参数化查询(防注入)
async function getUserById(id: number) {
const [rows] = await pool.query(
'SELECT * FROM users WHERE id = ?',
[id]
);
return rows[0];
}
// 事务
async function transfer(fromId: number, toId: number, amount: number) {
const connection = await pool.getConnection();
try {
await connection.beginTransaction();
await connection.query(
'UPDATE accounts SET balance = balance - ? WHERE id = ?',
[amount, fromId]
);
await connection.query(
'UPDATE accounts SET balance = balance + ? WHERE id = ?',
[amount, toId]
);
await connection.commit();
} catch (err) {
await connection.rollback();
throw err;
} finally {
connection.release(); // 释放回连接池
}
}
PostgreSQL 连接
import { Pool, PoolClient } from 'pg';
const pool = new Pool({
host: 'localhost',
port: 5432,
user: 'postgres',
password: 'password',
database: 'mydb',
max: 20, // 最大连接数
idleTimeoutMillis: 30000, // 空闲超时
connectionTimeoutMillis: 2000
});
// 查询
async function getUsers() {
const result = await pool.query('SELECT * FROM users');
return result.rows;
}
// 参数化查询
async function getUserById(id: number) {
const result = await pool.query(
'SELECT * FROM users WHERE id = $1',
[id]
);
return result.rows[0];
}
// 事务
async function withTransaction<T>(
callback: (client: PoolClient) => Promise<T>
): Promise<T> {
const client = await pool.connect();
try {
await client.query('BEGIN');
const result = await callback(client);
await client.query('COMMIT');
return result;
} catch (err) {
await client.query('ROLLBACK');
throw err;
} finally {
client.release();
}
}
MongoDB 连接
import { MongoClient, Db, Collection, ObjectId } from 'mongodb';
const uri = 'mongodb://localhost:27017';
const client = new MongoClient(uri, {
maxPoolSize: 50, // 连接池大小
minPoolSize: 5,
maxIdleTimeMS: 30000
});
let db: Db;
// 连接
async function connect() {
await client.connect();
db = client.db('mydb');
console.log('Connected to MongoDB');
}
// CRUD 操作
interface User {
_id?: ObjectId;
name: string;
email: string;
createdAt: Date;
}
async function createUser(user: Omit<User, '_id'>) {
const collection: Collection<User> = db.collection('users');
const result = await collection.insertOne({
...user,
createdAt: new Date()
});
return result.insertedId;
}
async function findUsers(query: Partial<User> = {}) {
const collection: Collection<User> = db.collection('users');
return collection.find(query).toArray();
}
async function updateUser(id: string, update: Partial<User>) {
const collection: Collection<User> = db.collection('users');
return collection.updateOne(
{ _id: new ObjectId(id) },
{ $set: update }
);
}
// 聚合
async function getUserStats() {
const collection: Collection<User> = db.collection('users');
return collection.aggregate([
{
$group: {
_id: { $month: '$createdAt' },
count: { $sum: 1 }
}
},
{ $sort: { _id: 1 } }
]).toArray();
}
Redis 连接
import { createClient, RedisClientType } from 'redis';
const client: RedisClientType = createClient({
url: 'redis://localhost:6379',
socket: {
reconnectStrategy: (retries) => Math.min(retries * 50, 500)
}
});
client.on('error', (err) => console.error('Redis Error:', err));
client.on('connect', () => console.log('Redis connected'));
await client.connect();
// 基本操作
await client.set('key', 'value');
await client.setEx('key', 3600, 'value'); // 1小时过期
const value = await client.get('key');
// 哈希
await client.hSet('user:1', { name: 'Alice', age: '30' });
const user = await client.hGetAll('user:1');
// 列表
await client.lPush('queue', 'task1');
const task = await client.rPop('queue');
// 集合
await client.sAdd('tags', ['node', 'typescript']);
const tags = await client.sMembers('tags');
// 缓存模式
async function getWithCache<T>(
key: string,
fetcher: () => Promise<T>,
ttl: number = 3600
): Promise<T> {
const cached = await client.get(key);
if (cached) {
return JSON.parse(cached);
}
const data = await fetcher();
await client.setEx(key, ttl, JSON.stringify(data));
return data;
}
ORM 使用
Prisma
// schema.prisma
// generator client {
// provider = "prisma-client-js"
// }
//
// model User {
// id Int @id @default(autoincrement())
// email String @unique
// name String?
// posts Post[]
// createdAt DateTime @default(now())
// }
//
// model Post {
// id Int @id @default(autoincrement())
// title String
// author User @relation(fields: [authorId], references: [id])
// authorId Int
// }
import { PrismaClient } from '@prisma/client';
const prisma = new PrismaClient();
// CRUD
async function createUser(email: string, name?: string) {
return prisma.user.create({
data: { email, name }
});
}
async function getUserWithPosts(id: number) {
return prisma.user.findUnique({
where: { id },
include: { posts: true }
});
}
async function updateUser(id: number, data: { name?: string }) {
return prisma.user.update({
where: { id },
data
});
}
// 事务
async function createUserWithPost(email: string, title: string) {
return prisma.$transaction(async (tx) => {
const user = await tx.user.create({ data: { email } });
const post = await tx.post.create({
data: { title, authorId: user.id }
});
return { user, post };
});
}
Mongoose(MongoDB ODM)
import mongoose, { Schema, Document, Model } from 'mongoose';
interface IUser extends Document {
name: string;
email: string;
createdAt: Date;
}
const userSchema = new Schema<IUser>({
name: { type: String, required: true },
email: { type: String, required: true, unique: true },
createdAt: { type: Date, default: Date.now }
});
// 添加索引
userSchema.index({ email: 1 });
// 方法
userSchema.methods.getProfile = function() {
return { name: this.name, email: this.email };
};
// 静态方法
userSchema.statics.findByEmail = function(email: string) {
return this.findOne({ email });
};
const User: Model<IUser> = mongoose.model('User', userSchema);
// 使用
await mongoose.connect('mongodb://localhost:27017/mydb');
const user = new User({ name: 'Alice', email: 'alice@example.com' });
await user.save();
const found = await User.findOne({ email: 'alice@example.com' });
连接池原理
| 参数 | 说明 | 建议值 |
|---|---|---|
| max | 最大连接数 | CPU核数 × 2 ~ 4 |
| min | 最小连接数 | 0 ~ 5 |
| idleTimeout | 空闲超时 | 30s ~ 5min |
| connectionTimeout | 连接超时 | 2s ~ 10s |
常见面试问题
Q1: 什么是连接池?为什么需要连接池?
答案:
连接池预先创建并维护一定数量的数据库连接,复用这些连接处理请求。
优点:
- 减少开销:避免频繁创建/销毁连接
- 提高性能:复用已建立的连接
- 控制资源:限制最大连接数
- 提高稳定性:连接管理和健康检查
// 无连接池:每次请求新建连接
async function queryWithoutPool() {
const conn = await mysql.createConnection(config);
const result = await conn.query('SELECT 1');
await conn.end(); // 销毁连接
return result;
}
// 有连接池:复用连接
const pool = mysql.createPool(config);
async function queryWithPool() {
const result = await pool.query('SELECT 1');
return result; // 连接自动归还池
}
Q2: ORM 和原生 SQL 各有什么优缺点?
答案:
| 特性 | ORM | 原生 SQL |
|---|---|---|
| 开发效率 | 高 | 低 |
| 类型安全 | 好(Prisma) | 需手动定义 |
| 性能 | 有开销 | 最优 |
| 灵活性 | 较低 | 最高 |
| 学习曲线 | 需学习 API | 需学习 SQL |
| 复杂查询 | 可能受限 | 完全支持 |
// ORM - 简洁但灵活性受限
const users = await prisma.user.findMany({
where: { age: { gte: 18 } },
include: { posts: true }
});
// 原生 SQL - 灵活但繁琐
const [users] = await pool.query(`
SELECT u.*, GROUP_CONCAT(p.title) as posts
FROM users u
LEFT JOIN posts p ON u.id = p.user_id
WHERE u.age >= ?
GROUP BY u.id
`, [18]);
Q3: 如何处理数据库连接错误?
答案:
// 重试机制
async function queryWithRetry<T>(
fn: () => Promise<T>,
maxRetries: number = 3
): Promise<T> {
let lastError: Error;
for (let i = 0; i < maxRetries; i++) {
try {
return await fn();
} catch (err) {
lastError = err as Error;
// 判断是否可重试
if (!isRetryableError(err)) {
throw err;
}
// 指数退避
await sleep(Math.pow(2, i) * 100);
}
}
throw lastError!;
}
function isRetryableError(err: any): boolean {
const retryableCodes = [
'ECONNRESET',
'ETIMEDOUT',
'ECONNREFUSED',
'ER_LOCK_DEADLOCK'
];
return retryableCodes.includes(err.code);
}
// 健康检查
async function checkDatabaseHealth(): Promise<boolean> {
try {
await pool.query('SELECT 1');
return true;
} catch {
return false;
}
}
Q4: 如何优化数据库查询性能?
答案:
| 方法 | 说明 |
|---|---|
| 索引 | 为常用查询字段添加索引 |
| 分页 | 使用 LIMIT/OFFSET 或游标分页 |
| 投影 | 只查询需要的字段 |
| 缓存 | Redis 缓存热点数据 |
| 批量操作 | 减少往返次数 |
| 读写分离 | 主从架构 |
// 游标分页(大数据量更高效)
async function getUsersWithCursor(cursor?: number, limit = 20) {
const where = cursor ? 'WHERE id > ?' : '';
const params = cursor ? [cursor, limit] : [limit];
const [rows] = await pool.query(
`SELECT id, name FROM users ${where} ORDER BY id LIMIT ?`,
params
);
return {
data: rows,
nextCursor: rows.length === limit ? rows[rows.length - 1].id : null
};
}
// 批量插入
async function batchInsert(users: User[]) {
const values = users.map(u => [u.name, u.email]);
await pool.query(
'INSERT INTO users (name, email) VALUES ?',
[values]
);
}
Q5: 什么是数据库事务?ACID 是什么?
答案:
事务是一组原子操作,要么全部成功,要么全部失败。
| 特性 | 含义 |
|---|---|
| A (Atomicity) | 原子性:全部成功或全部失败 |
| C (Consistency) | 一致性:数据始终保持有效状态 |
| I (Isolation) | 隔离性:事务间互不干扰 |
| D (Durability) | 持久性:提交后数据永久保存 |
// 转账事务示例
async function transfer(from: number, to: number, amount: number) {
const client = await pool.connect();
try {
await client.query('BEGIN');
// 检查余额
const { rows } = await client.query(
'SELECT balance FROM accounts WHERE id = $1 FOR UPDATE',
[from]
);
if (rows[0].balance < amount) {
throw new Error('余额不足');
}
// 扣款
await client.query(
'UPDATE accounts SET balance = balance - $1 WHERE id = $2',
[amount, from]
);
// 入账
await client.query(
'UPDATE accounts SET balance = balance + $1 WHERE id = $2',
[amount, to]
);
await client.query('COMMIT');
} catch (err) {
await client.query('ROLLBACK');
throw err;
} finally {
client.release();
}
}