Rust编程-核心篇-异步编程
来源:     阅读:2
易浩激活码
发布于 2025-10-20 21:56
查看主页

异步编程的必要性

在现代软件开发中,I/O密集型任务(如网络请求、文件操作、数据库查询)占据了很大比重。传统的同步编程模型在处理这些任务时,线程会被阻塞,导致资源利用率低下。异步编程通过非阻塞I/O和协作式多任务,能够显著提高程序的并发性能。

Rust的异步编程模型基于async/await语法,结合强劲的类型系统和零成本抽象,提供了既高效又安全的异步编程体验。与传统的回调地狱和Promise链相比,Rust的异步代码看起来像同步代码,但具有异步的性能优势。

异步基础

基本语法

use std::future::Future;use std::pin::Pin;use std::task::{Context, Poll};// 简单的异步函数async fn hello_world() {    println!("Hello, world!");
}// 异步函数返回Futureasync fn get_data() -> String {    // 模拟异步操作
    tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;    "Data retrieved".to_string()
}#[tokio::main]async fn main() {    let data = get_data().await;    println!("{}", data);
}

异步函数组合

async fn fetch_user(id: u32) -> Result<String, String> {    // 模拟网络请求
    tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;    Ok(format!("User {}", id))
}async fn fetch_posts(user_id: u32) -> Result<Vec<String>, String> {    // 模拟数据库查询
    tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;    Ok(vec![        format!("Post 1 by user {}", user_id),        format!("Post 2 by user {}", user_id),
    ])
}async fn get_user_with_posts(id: u32) -> Result<(String, Vec<String>), String> {    let user = fetch_user(id).await?;    let posts = fetch_posts(id).await?;    Ok((user, posts))
}#[tokio::main]async fn main() {    match get_user_with_posts(1).await {        Ok((user, posts)) => {            println!("User: {}", user);            for post in posts {                println!("  - {}", post);
            }
        }        Err(e) => println!("Error: {}", e),
    }
}

Future trait

use std::future::Future;use std::pin::Pin;use std::task::{Context, Poll};use std::time::{Duration, Instant};struct Delay {
    when: Instant,
}impl Future for Delay {    type Output = &'static str;    
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {        if Instant::now() >= self.when {            println!("Hello world");
            Poll::Ready("done")
        } else {            // 获取当前任务的Waker            let waker = cx.waker().clone();            let when = self.when;            
            // 在另一个线程中设置定时器
            std::thread::spawn(move || {                let now = Instant::now();                if now < when {
                    std::thread::sleep(when - now);
                }
                waker.wake();
            });
            
            Poll::Pending
        }
    }
}#[tokio::main]async fn main() {    let when = Instant::now() + Duration::from_millis(10);    let future = Delay { when };    
    let out = future.await;    assert_eq!(out, "done");
}

并发执行

使用join!并发执行

use tokio::time::{sleep, Duration};async fn fetch_data_from_api() -> String {
    sleep(Duration::from_millis(100)).await;    "API data".to_string()
}async fn fetch_data_from_db() -> String {
    sleep(Duration::from_millis(150)).await;    "DB data".to_string()
}async fn fetch_data_from_cache() -> String {
    sleep(Duration::from_millis(50)).await;    "Cache data".to_string()
}#[tokio::main]async fn main() {    let start = std::time::Instant::now();    
    // 并发执行多个异步操作    let (api_data, db_data, cache_data) = tokio::join!(
        fetch_data_from_api(),
        fetch_data_from_db(),
        fetch_data_from_cache()
    );    
    let duration = start.elapsed();    println!("All data fetched in {:?}", duration);    println!("API: {}, DB: {}, Cache: {}", api_data, db_data, cache_data);
}

使用select!选择最快的结果

use tokio::time::{sleep, Duration, timeout};async fn slow_operation() -> String {
    sleep(Duration::from_secs(2)).await;    "Slow result".to_string()
}async fn fast_operation() -> String {
    sleep(Duration::from_millis(100)).await;    "Fast result".to_string()
}#[tokio::main]async fn main() {
    tokio::select! {
        result = slow_operation() => {            println!("Slow operation completed: {}", result);
        }
        result = fast_operation() => {            println!("Fast operation completed: {}", result);
        }
        _ = sleep(Duration::from_secs(1)) => {            println!("Timeout reached");
        }
    }
}

异步流处理

use tokio_stream::{self as stream, StreamExt};use std::time::Duration;async fn process_stream() {    let mut stream = stream::iter(1..=10)
        .then(|i| async move {
            tokio::time::sleep(Duration::from_millis(100)).await;
            i * 2
        })
        .filter(|&x| x % 4 == 0)
        .map(|x| format!("Processed: {}", x));    
    while let Some(value) = stream.next().await {        println!("{}", value);
    }
}#[tokio::main]async fn main() {
    process_stream().await;
}

实际应用

use reqwest;use serde_json;use std::collections::HashMap;#[derive(Debug, serde::Deserialize)]struct User {
    id: u32,
    name: String,
    email: String,
}#[derive(Debug, serde::Deserialize)]struct Post {
    id: u32,
    title: String,
    body: String,
    userId: u32,
}async fn fetch_user(id: u32) -> Result<User, reqwest::Error> {    let url = format!("https://jsonplaceholder.typicode.com/users/{}", id);    let response = reqwest::get(&url).await?;    let user: User = response.json().await?;    Ok(user)
}async fn fetch_user_posts(user_id: u32) -> Result<Vec<Post>, reqwest::Error> {    let url = format!("https://jsonplaceholder.typicode.com/posts?userId={}", user_id);    let response = reqwest::get(&url).await?;    let posts: Vec<Post> = response.json().await?;    Ok(posts)
}async fn get_user_with_posts(id: u32) -> Result<(User, Vec<Post>), reqwest::Error> {    let user = fetch_user(id).await?;    let posts = fetch_user_posts(id).await?;    Ok((user, posts))
}#[tokio::main]async fn main() -> Result<(), Box<dyn std::error::Error>> {    let start = std::time::Instant::now();    
    // 并发获取多个用户的数据    let futures = (1..=5).map(|id| get_user_with_posts(id));    let results = futures::future::join_all(futures).await;    
    let duration = start.elapsed();    println!("Fetched {} users in {:?}", results.len(), duration);    
    for result in results {        match result {            Ok((user, posts)) => {                println!("User: {} ({} posts)", user.name, posts.len());
            }            Err(e) => {                println!("Error fetching user: {}", e);
            }
        }
    }    
    Ok(())
}

异步数据库操作

use sqlx::{PgPool, Row};use serde::{Deserialize, Serialize};#[derive(Debug, Serialize, Deserialize)]struct User {
    id: i32,
    name: String,
    email: String,
}#[derive(Debug, Serialize, Deserialize)]struct Post {
    id: i32,
    title: String,
    content: String,
    user_id: i32,
}struct Database {
    pool: PgPool,
}impl Database {    async fn new(database_url: &str) -> Result<Self, sqlx::Error> {        let pool = PgPool::connect(database_url).await?;        Ok(Self { pool })
    }    
    async fn create_user(&self, name: &str, email: &str) -> Result<User, sqlx::Error> {        let row = sqlx::query!(            "INSERT INTO users (name, email) VALUES ($1, $2) RETURNING id, name, email",
            name, email
        )
        .fetch_one(&self.pool)
        .await?;        
        Ok(User {
            id: row.id,
            name: row.name,
            email: row.email,
        })
    }    
    async fn get_user(&self, id: i32) -> Result<Option<User>, sqlx::Error> {        let row = sqlx::query!(            "SELECT id, name, email FROM users WHERE id = $1",
            id
        )
        .fetch_optional(&self.pool)
        .await?;        
        Ok(row.map(|r| User {
            id: r.id,
            name: r.name,
            email: r.email,
        }))
    }    
    async fn get_user_posts(&self, user_id: i32) -> Result<Vec<Post>, sqlx::Error> {        let rows = sqlx::query!(            "SELECT id, title, content, user_id FROM posts WHERE user_id = $1",
            user_id
        )
        .fetch_all(&self.pool)
        .await?;        
        Ok(rows.into_iter().map(|row| Post {
            id: row.id,
            title: row.title,
            content: row.content,
            user_id: row.user_id,
        }).collect())
    }    
    async fn create_post(&self, title: &str, content: &str, user_id: i32) -> Result<Post, sqlx::Error> {        let row = sqlx::query!(            "INSERT INTO posts (title, content, user_id) VALUES ($1, $2, $3) RETURNING id, title, content, user_id",
            title, content, user_id
        )
        .fetch_one(&self.pool)
        .await?;        
        Ok(Post {
            id: row.id,
            title: row.title,
            content: row.content,
            user_id: row.user_id,
        })
    }
}#[tokio::main]async fn main() -> Result<(), Box<dyn std::error::Error>> {    let db = Database::new("postgresql://user:password@localhost/mydb").await?;    
    // 创建用户    let user = db.create_user("Alice", "alice@example.com").await?;    println!("Created user: {:?}", user);    
    // 并发创建多个帖子    let post_futures = (1..=5).map(|i| {
        db.create_post(
            &format!("Post {}", i),
            &format!("Content of post {}", i),
            user.id
        )
    });    
    let posts = futures::future::join_all(post_futures).await;    for post in posts {        match post {            Ok(post) => println!("Created post: {:?}", post),            Err(e) => println!("Error creating post: {}", e),
        }
    }    
    // 获取用户及其帖子    if let Some(user) = db.get_user(user.id).await? {        let posts = db.get_user_posts(user.id).await?;        println!("User {} has {} posts", user.name, posts.len());
    }    
    Ok(())
}

异步Web服务器

use axum::{
    extract::{Path, Query},
    http::StatusCode,
    response::Json,
    routing::{get, post},
    Router,
};use serde::{Deserialize, Serialize};use std::collections::HashMap;use tokio::time::{sleep, Duration};#[derive(Serialize, Deserialize)]struct User {
    id: u32,
    name: String,
    email: String,
}#[derive(Deserialize)]struct CreateUser {
    name: String,
    email: String,
}#[derive(Deserialize)]struct UserQuery {
    limit: Option<u32>,
    offset: Option<u32>,
}// 模拟数据库type UserDb = std::sync::Arc<tokio::sync::RwLock<HashMap<u32, User>>>;async fn get_users(
    Query(params): Query<UserQuery>,
    Extension(db): Extension<UserDb>,
) -> Result<Json<Vec<User>>, StatusCode> {    let db = db.read().await;    let users: Vec<User> = db.values().cloned().collect();    
    let limit = params.limit.unwrap_or(10);    let offset = params.offset.unwrap_or(0);    
    let users: Vec<User> = users
        .into_iter()
        .skip(offset as usize)
        .take(limit as usize)
        .collect();    
    Ok(Json(users))
}async fn get_user(
    Path(id): Path<u32>,
    Extension(db): Extension<UserDb>,
) -> Result<Json<User>, StatusCode> {    let db = db.read().await;    match db.get(&id) {        Some(user) => Ok(Json(user.clone())),        None => Err(StatusCode::NOT_FOUND),
    }
}async fn create_user(
    Json(payload): Json<CreateUser>,
    Extension(db): Extension<UserDb>,
) -> Result<Json<User>, StatusCode> {    let mut db = db.write().await;    let id = (db.len() as u32) + 1;    
    let user = User {
        id,
        name: payload.name,
        email: payload.email,
    };
    
    db.insert(id, user.clone());    Ok(Json(user))
}async fn slow_operation() -> Json<serde_json::Value> {    // 模拟慢操作
    sleep(Duration::from_secs(2)).await;
    Json(serde_json::json!({"message": "Slow operation completed"}))
}#[tokio::main]async fn main() {    let db: UserDb = std::sync::Arc::new(tokio::sync::RwLock::new(HashMap::new()));    
    let app = Router::new()
        .route("/users", get(get_users).post(create_user))
        .route("/users/:id", get(get_user))
        .route("/slow", get(slow_operation))
        .layer(axum::Extension(db));    
    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();    println!("Server running on http://0.0.0.0:3000");
    
    axum::serve(listener, app).await.unwrap();
}

异步错误处理

use thiserror::Error;#[derive(Error, Debug)]enum AppError {    #[error("Network error: {0}")]
    Network(#[from] reqwest::Error),    
    #[error("Database error: {0}")]
    Database(#[from] sqlx::Error),    
    #[error("Validation error: {message}")]
    Validation { message: String },    
    #[error("Timeout error")]
    Timeout,
}async fn risky_operation() -> Result<String, AppError> {    // 模拟可能失败的操作
    tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;    
    if rand::random::<f32>() < 0.3 {        Err(AppError::Validation {
            message: "Random validation failure".to_string(),
        })
    } else {        Ok("Success".to_string())
    }
}async fn handle_errors() -> Result<(), AppError> {    let result = risky_operation().await?;    println!("Operation result: {}", result);    Ok(())
}#[tokio::main]async fn main() {    match handle_errors().await {        Ok(_) => println!("All operations completed successfully"),        Err(e) => println!("Error occurred: {}", e),
    }
}

性能优化技巧

1. 合理使用spawn

use tokio::task;async fn cpu_intensive_task() -> u32 {    // CPU密集型任务    let mut sum = 0;    for i in 0..1_000_000 {
        sum += i;
    }
    sum
}async fn io_intensive_task() -> String {    // I/O密集型任务
    tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;    "I/O completed".to_string()
}#[tokio::main]async fn main() {    // CPU密集型任务应该在阻塞线程池中运行    let cpu_handle = task::spawn_blocking(|| {        // 这里会在线程池中运行
        cpu_intensive_task()
    });    
    // I/O密集型任务可以在异步运行时中运行    let io_handle = task::spawn(io_intensive_task());    
    let (cpu_result, io_result) = tokio::join!(cpu_handle, io_handle);    println!("CPU result: {:?}", cpu_result);    println!("I/O result: {:?}", io_result);
}

2. 连接池管理

use sqlx::PgPool;struct AppState {
    db_pool: PgPool,
    http_client: reqwest::Client,
}impl AppState {    async fn new() -> Result<Self, Box<dyn std::error::Error>> {        let db_pool = PgPool::connect("postgresql://user:password@localhost/mydb").await?;        let http_client = reqwest::Client::new();        
        Ok(Self {
            db_pool,
            http_client,
        })
    }
}

常见陷阱与最佳实践

1. 避免阻塞异步运行时

// 不好的做法:在异步函数中使用阻塞操作async fn bad_function() {
    std::thread::sleep(std::time::Duration::from_secs(1)); // 阻塞整个运行时}// 好的做法:使用异步睡眠async fn good_function() {
    tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}

2. 正确处理撤销

use tokio::time::{sleep, Duration, timeout};async fn cancellable_operation() -> Result<String, &'static str> {
    sleep(Duration::from_secs(5)).await;    Ok("Operation completed".to_string())
}#[tokio::main]async fn main() {    match timeout(Duration::from_secs(2), cancellable_operation()).await {        Ok(result) => println!("Result: {:?}", result),        Err(_) => println!("Operation timed out"),
    }
}

写在最后

Rust的异步编程模型提供了:

  1. 高性能:非阻塞I/O和协作式多任务

  2. 类型安全:编译时检查异步代码的正确性

  3. 零成本抽象:异步代码的性能接近手写的回调代码

  4. 易于使用:async/await语法让异步代码看起来像同步代码

异步编程是现代高性能应用程序的必备技能。通过Rust的异步编程模型,我们可以构建出既高效又安全的并发应用程序。

免责声明:本文为用户发表,不代表网站立场,仅供参考,不构成引导等用途。 系统环境
相关推荐
首页
搜索
订单
购物车
我的