锁定被迫交出时间片。
锁定意味着阻塞,多个线程(进程)排队获取资源,无法充分发挥系统性能。
锁定的阻塞无法通过fd进行通知,对性能有进一步的影响(理想的服务器模型是全局一处阻塞统一等待消息)。
一些锁限制了必须使用线程的方式进行开发,而线程无法充分利用系统的内存。
pthread库在特殊情况下可能产生饥饿的情况。
加锁的根本起因是什么?
资源竞争。
解决资源竞争的思路有哪些?
分资源:资源进一步分配,各个资源获得方不相往来。
分功能:对资源进行规划,各自处理不同功能。
做冗余:对资源进行冗余,对当前资源进行切换。
二次检查:不加锁执行后,检查是否被修改(CAS)。
少锁
原子操作与忙等待
CAS解法与ABA问题
seqlock
免锁
数据与进程对等的处理
单一生产者与单一消费者进程
下面让我们一个一个的来梳理无锁编程的内容吧。
ouble-checked Locking,严格意义上来讲不属于无锁范畴,无论什么时候当临界区中的代码仅仅需要加锁一次,同时当其获取锁的时候必须是线程安全的,此时就可以利用 Double-checked Locking 模式来减少锁竞争和加锁载荷。目前Double-checkedLocking已经广泛应用于单例 (Singleton)模式中。
Double-checked Locking有以下特点:
Double-checked Locking模式是Singleton的多线程版本。 Double-checked Locking模式依旧会使用锁——临界区锁定,不要以为可以避免使用锁。 Double-checked Locking解决的问题是:当多个线程存在访问临界区企图时,保证了临界区只需要访问一次。
以Singleton为例,为了防止多次分配,一般Singleton的实现方式是:
// 实现1
Class singleton
{
singleton* get_instance()
{
lock();
if (instance == 0)
{
instance = new singleton;
}
unlock();
return instance;
}
}这里存在的问题是:无论是否已经初始化都要加锁,增加了负荷,已经没有所谓的并发性能了。
要增加并发性能,可以先判断是否已经分配,在没分配的情况下才加锁,也许你想要改成下面这个样子:
// 实现2
Class singleton
{
singleton* get_instance()
{
if (instance == 0)
{
lock();
instance = new singleton;
unlock();
}
return instance;
}
}这里存在的问题是:不能保证临界区只初始化一次,没能实现singleton的基本功能。
// 实现3 - Double-checkedLocking
Class singleton
{
singleton* get_instance()
{
if (instance == 0)
{
lock();
if (instance == 0 )
{
instance = new singleton;
}
unlock();
}
return instance;
}
}严格的说,Double-checked locking不属于无锁编程的范畴,但由原来的每次加锁访问到大多数情况下无须加锁,就是一个巨大的进步。
【文章福利】:小编整理了一些个人觉得比较好的学习书籍、视频资料共享在群文件里面,有需要的可以自行添加哦!~点击加入君羊(832218493需要自取)

原子操作可以保证指令以原子的方式执行——执行过程不被打断,原子操作是多数无锁编程的基本前提。
对1字节的读写
对2字节数(对齐到16位边界)读写
对4字节数(对齐到32位边界)读写
对8字节数(对齐到64位边界)读写
xchg
在x86平台上,CPU提供了在指令执行期间对总线加锁的手段。CPU芯片上有一条引线#HLOCK pin,如果汇编语言的程序中在一条指令前面加上前缀"LOCK",经过汇编后来的机器代码就使CPU在执行这条指令的时候把#HLOCK pin的电位拉低,持续到这条指令结束时放开,从而把总线锁住,这样同一总线上别的CPU就暂时不能通过总线访问内存了,保证了这条指令在多处理器环境中的原子性。
LOCK是一个指令的描述符,表明后续的指令在执行的时候,在内存总线上加锁。总线锁会导致其他几个核在必定时钟周期内无法访问内存。虽然总线锁会影响其他核的性能,但比起操作系统级别的锁,已经轻量太多了。
#lock是锁FSB(前端串行总线,front serial bus),FSB是处理器和RAM之间的总线,锁住了它,就能阻止其他处理器或core从RAM获取数据。
声明和定义:
void atomic_set(atomic_t *v, int i);
atomic_t v = ATOMIC_INIT(0);读写操作:
int atomic_read(atomic_t *v);
void atomic_add(int i, atomic_t *v);
void atomic_sub(int i, atomic_t *v);加一减一:
void atomic_inc(atomic_t *v);
void atomic_dec(atomic_t *v);执行操作并且测试结果:执行操作之后,如果v是0,那么返回1,否则返回0
int atomic_inc_and_test(atomic_t *v);
int atomic_dec_and_test(atomic_t *v);
int atomic_sub_and_test(int i, atomic_t *v);
int atomic_add_negative(int i, atomic_t *v);
int atomic_add_return(int i, atomic_t *v);
int atomic_sub_return(int i, atomic_t *v);
int atomic_inc_return(atomic_t *v);
int atomic_dec_return(atomic_t *v);gcc内置的__sync_*函数提供了加减和逻辑运算的原子操作,__sync_fetch_and_add系列一共有十二个函数,有加/减/与/或/异或/等函数的原子性操作函数,__sync_fetch_and_add,顾名思义,先fetch,然后自加,返回的是自加以前的值。以count = 4为例,调用__sync_fetch_and_add(&count,1),之后,返回值是4,然后,count变成了5. 有__sync_fetch_and_add,自然也就有__sync_add_and_fetch,先自加,再返回。这两个的关系与i++和++i的关系是一样的。
type可以是1,2,4或8字节长度的int类型,即:
int8_t / uint8_t
int16_t / uint16_t
int32_t / uint32_t
int64_t / uint64_t
type __sync_fetch_and_add (type *ptr, typevalue);
type __sync_fetch_and_sub (type *ptr, type value);
type __sync_fetch_and_or (type *ptr, type value);
type __sync_fetch_and_and (type *ptr, type value);
type __sync_fetch_and_xor (type *ptr, type value);
type __sync_fetch_and_nand(type *ptr, type value);
type __sync_add_and_fetch (type *ptr, typevalue);
type __sync_sub_and_fetch (type *ptr, type value);
type __sync_or_and_fetch (type *ptr, type value);
type __sync_and_and_fetch (type *ptr, type value);
type __sync_xor_and_fetch (type *ptr, type value);
type __sync_nand_and_fetch (type *ptr, type value);代码讲解1:使用__sync_fetch_and_add操作全局变量
<strong>#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/time.h>
#include <stdint.h>
int count = 0;
void *test_func(void *arg)
{
int i=0;
for(i=0;i<2000000;++i)
{
__sync_fetch_and_add(&count,1);
}
return NULL;
}
int main(int argc, const char *argv[])
{
pthread_t id[20];
int i = 0;
uint64_t usetime;
struct timeval start;
struct timeval end;
gettimeofday(&start,NULL);
for(i=0;i<20;++i)
{
pthread_create(&id[i],NULL,test_func,NULL);
}
for(i=0;i<20;++i)
{
pthread_join(id[i],NULL);
}
gettimeofday(&end,NULL);
usetime = (end.tv_sec-start.tv_sec)*1000000+(end.tv_usec-start.tv_usec);
printf("count = %d, usetime = %lu usecs
", count, usetime);
return 0;
}
</strong>代码讲解2:使用互斥锁mutex操作全局变量
<strong>#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/time.h>
#include <stdint.h>
int count = 0;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
void *test_func(void *arg)
{
int i=0;
for(i=0;i<2000000;++i)
{
pthread_mutex_lock(&mutex);
++count;
pthread_mutex_unlock(&mutex);
}
return NULL;
}
int main(int argc, const char *argv[])
{
pthread_t id[20];
int i = 0;
uint64_t usetime;
struct timeval start;
struct timeval end;
gettimeofday(&start,NULL);
for(i=0;i<20;++i)
{
pthread_create(&id[i],NULL,test_func,NULL);
}
for(i=0;i<20;++i)
{
pthread_join(id[i],NULL);
}
gettimeofday(&end,NULL);
usetime = (end.tv_sec-start.tv_sec)*1000000+(end.tv_usec-start.tv_usec);
printf("count = %d, usetime = %lu usecs
", count, usetime);
return 0;
}
</strong>结果说明:
[root@rocket lock-free]#./atom_add_gcc_buildin
count = 40000000, usetime = 756694 usecs
[root@rocket lock-free]# ./atom_add_mutex
count = 40000000, usetime = 3247131 usecs
可以看到,使用原子操作是使用互斥锁性能的5倍左右,随着冲突数量的增加,性能差距会进一步拉开。Alexander Sandler实测,原子操作性能大致是互斥锁的6-7倍左右。
有兴趣的同学请参考:
http://www.alexonlinux.com/multithreaded-simple-data-type-access-and-atomic-variables
xchg(ptr, new) 将ptr指向的值置为new,返回交换前的值。
cmpxchg(ptr, old, new) 比较当前值如果跟old一样,则将ptr指向的值置为new,否则不变,返回交换前的值。根据比较返回值是否和old一样来判断是否成功。
int fetch_and_add(int* i, int value, int* confict)
{
int old_value;
int new_value;
int v;
do
{
old_value = *i;
new_value = old_value + 1;
v = cmpxchg(i, old_value, new_value);
(*confict)++;
} while (old_value != v);
}忙等待可以认为是一种特殊的忙等待
Peterson算法
xchg解法
TSL解法
自旋锁
Peterson算法是一个实现互斥锁的并发程序设计算法,可以控制两个线程访问一个共享的单用户资源而不发生访问冲突。GaryL. Peterson于1981年提出此算法。
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/time.h>
#include <stdint.h>
int count = 0;
#define N 2
volatile int turn;
volatile int interested[N] = {0};
void enter_region(int process)
{
int other = 1 - process; //另一个进程
interested[process] = true;
turn = process;
while (turn == process && interested[other] == true) NULL; //一直循环,直到other进程退出临界区
}
void leave_region(int process)
{
interested[process] = false; // leave critical region
}
void *test_func(void *arg)
{
int process = *((int *)arg);
printf("thread %d run
", process);
int i=0;
for(i=0;i<2000000;++i)
{
enter_region(process);
//printf("%d enter, count = %d
", pthread_self(),count);
++count;
leave_region(process);
}
return NULL;
}
int main(int argc, const char *argv[])
{
pthread_t id[N];
int process[N];
int i = 0;
uint64_t usetime;
struct timeval start;
struct timeval end;
gettimeofday(&start,NULL);
for(i=0;i<N;++i)
{
process[i] = i;
}
for(i=0;i<N;++i)
{
pthread_create(&id[i],NULL,test_func,&process[i]);
}
for(i=0;i<N;++i)
{
pthread_join(id[i],NULL);
}
gettimeofday(&end,NULL);
usetime = (end.tv_sec-start.tv_sec)*1000000+(end.tv_usec-start.tv_usec);
printf("count = %d, usetime = %lu usecs
", count, usetime);
return 0;
}结果说明:
[root@rocket lock-free]#./busywait_peterson
thread 0 run
thread 1 run
count = 3999851, usetime = 263132 usecs
可以看出,虽然是互斥算法,但是实测的结果缺不是十分准确,有少量的count丢失,这点让人感到很差异,这里先不去深究,有经验的同学可以帮忙分析一下缘由。
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <sys/types.h>
#include <asm/system.h>
#include <sys/time.h>
#include <stdint.h>
volatile int in_using = 0;
int count = 0;
#define N 2
void enter_region()
{
while (xchg(&in_using, 1)) NULL;
}
void leave_region()
{
in_using = 0; // leave critical region
}
void *test_func(void *arg)
{
int i=0;
for(i=0;i<2000000;++i)
{
enter_region();
++count;
leave_region();
}
return NULL;
}
int main(int argc, const char *argv[])
{
pthread_t id[20];
int i = 0;
uint64_t usetime;
struct timeval start;
struct timeval end;
gettimeofday(&start,NULL);
for(i=0;i<N;++i)
{
pthread_create(&id[i],NULL,test_func,NULL);
}
for(i=0;i<N;++i)
{
pthread_join(id[i],NULL);
}
gettimeofday(&end,NULL);
usetime = (end.tv_sec-start.tv_sec)*1000000+(end.tv_usec-start.tv_usec);
printf("count = %d, usetime = %lu usecs
", count, usetime);
return 0;
}结果说明:这个结果自然是超级准确,感觉比peterson算法靠谱多了,性能倒是差别不大。
[root@rocket lock-free]# ./busywait_xchg
count = 4000000, usetime = 166548 usecs
enter_region:
tsl register, lock |复制lock到寄存器,并将lock置为1
cmp register, #0 | lock等于0吗?
jne enter_region |如果不等于0,已上锁,再次循环
ret |返回调用程序,进入临界区
leave_region:
move lock, #0 |置lock为0
ret |返回调用程序
自旋锁请参考我的另一篇文章,这里不再赘述。
Linux同步机制(一) - 线程锁
一般采用原子级的read-modify-write原语来实现Lock-Free算法,其中LL和SC是Lock-Free理论研究领域的理想原语,但实现这些原语需要CPU指令的支持,超级遗憾的是目前没有任何CPU直接实现了SC原语。根据此理论,业界在原子操作的基础上提出了著名的CAS(Compare-And-Swap)操作来实现Lock-Free算法,Intel实现了一条类似该操作的指令:cmpxchg8。
CAS原语负责将某处内存地址的值(1个字节)与一个期望值进行比较,如果相等,则将该内存地址处的值替换为新值,CAS 操作伪码描述如下:
Bool CAS(T* addr, T expected, T newValue)
{
if(*addr == expected )
{
*addr= newValue;
returntrue;
}
else
returnfalse;
}CAS实际操作
do
{
备份旧数据;
基于旧数据构造新数据;
}while(!CAS(内存地址,备份的旧数据,新数据))
就是指当两者进行比较时,如果相等,则证明共享数据没有被修改,替换成新值,然后继续往下运行;如果不相等,说明共享数据已经被修改,放弃已经所做的操作,然后重新执行刚才的操作。容易看出CAS操作是基于共享数据不会被修改的假设,采用了类似于数据库的commit-retry的模式。当同步冲突出现的机会很少时,这种假设能带来较大的性能提升。
cmpxchg先比较内存地址的值是否与传入的值相等,如果相等则执行xchg逻辑。
inline int CAS(unsigned long* mem, unsignedlong newval, unsigned long oldval)
{
__typeof(*mem) ret;
//这里测试的使用64位系统,如果是32位,这里使用cmpschgl
__asm__volatile ("lock; cmpxchgq %2,%1"
:"=a"(ret), "=m"(*mem)
:"r"(newval), "m"(*mem), "0"(oldval));
returnret==oldval;
}CAS举例(简单应用AtomicInc)
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/time.h>
#include <stdint.h>
int count = 0;
inline int CAS(unsigned long* mem, unsigned long oldval, unsigned long newval)
{
__typeof (*mem) ret;
// 这里测试的使用64位系统,如果是32位,这里使用cmpschgl
__asm __volatile ("lock; cmpxchgq %2,%1"
: "=a"(ret), "=m"(*mem)
: "r"(newval), "m"(*mem), "0"(oldval));
return ret==oldval;
}
void AtomicInc(int* addr)
{
int oldval;
int newval;
do
{
oldval = *addr;
newval = oldval+1;
} while(!CAS((unsigned long*)addr, oldval, newval));
}
void *test_func(void *arg)
{
int i=0;
int confict = 0;
for(i=0;i<2000000;++i)
{
AtomicInc(&count);
}
return NULL;
}
int main(int argc, const char *argv[])
{
pthread_t id[20];
int i = 0;
uint64_t usetime;
struct timeval start;
struct timeval end;
gettimeofday(&start,NULL);
for(i=0;i<20;++i)
{
pthread_create(&id[i],NULL,test_func,NULL);
}
for(i=0;i<20;++i)
{
pthread_join(id[i],NULL);
}
gettimeofday(&end,NULL);
usetime = (end.tv_sec-start.tv_sec)*1000000+(end.tv_usec-start.tv_usec);
printf("count = %d, usetime = %lu usecs
", count, usetime);
return 0;
}struct Node
{
Node* next;
int data;
}
Node* head = NULL;
void push(int t)
{
Node* node = new Node(t);
do
{
node->next = head;
} while (!CAS(&head, node->next, node));
}
bool pop(int&t )
{
Node* current = head;
while(current)
{
if (CAS(&head, current, current->next)) // ABA问题
{
t = current->data;
return true;
}
current = head;
}
return false;
}一般的CAS在决定是否要修改某个变量时,会判断一下当前值跟旧值是否相等。如果相等,则认为变量未被其他线程修改,可以改。 但是,“相等”并不真的意味着“未被修改”。另一个线程可能会把变量的值从A改成B,又从B改回成A。这就是ABA问题。 许多情况下,ABA问题不会影响你的业务逻辑因此可以忽略。但有时不能忽略,这时要解决这个问题,一般的做法是给变量关联一个只能递增、不能递减的版本号。在compare时不但compare变量值,还要再compare一下版本号。 Java里的AtomicStampedReference类就是干这个的。
RCU就是指读-拷贝修改,它是基于其原理命名的。对于被RCU保护的共享数据结构,读操作不需要获得任何锁就可以访问,但写操作在访问它时第一拷贝一个副本,然后对副本进行修改,最后在适当的时机把指向原来数据的指针重新指向新的被修改的数据。这个时机就是所有引用该数据的CPU都退出对共享数据的操作。
Linux内核中内存管理大量的运用到了RCU机制。为每个内存对象增加了一个原子计数器用来继续该对象当前访问数。当没有其他进程在访问该对象时(计数器为0),才允许回收该内存。
从这个流程可以看出,RCU类似于一种读写锁的优化,用于解决读和写之间的同步问题。比较适合读多,写少的情况,当写操作过多的时候,这里的拷贝和修改的成本同样也很大。(写操作和写操作之间的同步还需要其它机制来保证)。
代码讲解:
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
int currentidx = 0;
char* str[2] = {0};
void* consume(void *arg)
{
sleep(1);
while(1)
{
printf("************************consumed %s, index %d, self %d
",str[currentidx], currentidx, pthread_self());
sleep(1);
}
return NULL;
}
void* produce( void * arg )
{
const char* s_str1 = "hello";
const char* s_str2 = "world";
while(1)
{
printf("product begin
");
// read copy
int other = 1 - currentidx;
str[other] = (char*)malloc(6);
if (other == 0)
{
strncpy(str[other], s_str1, 6);
}
else
{
strncpy(str[other], s_str2, 6);
}
// update原子的修改索引
currentidx = other;
// delete old currentidx
free(str[1-currentidx]);
sleep(5);
}
return NULL;
}
int main( void )
{
pthread_t thread1,thread2;
pthread_create(&thread1, NULL, &produce, NULL );
pthread_create(&thread2, NULL, &consume, NULL );
pthread_join(thread1,NULL);
pthread_join(thread2,NULL);
return 0;
}结果说明:
[root@rocket lock-free]# ./lockfree_rcu
product begin
************************consumed world, index1, self 1395513088
************************consumed world, index1, self 1395513088
************************consumed world, index1, self 1395513088
************************consumed world, index1, self 1395513088
product begin
************************consumed hello, index0, self 1395513088
************************consumed hello, index0, self 1395513088
************************consumed hello, index0, self 1395513088
************************consumed hello, index0, self 1395513088
************************consumed hello, index0, self 1395513088
product begin
************************consumed world, index1, self 1395513088
************************consumed world, index1, self 1395513088
************************consumed world, index1, self 1395513088
************************consumed world, index1, self 1395513088
************************consumed world, index1, self 1395513088
用于能够区分读与写的场合,并且是读操作许多、写操作很少,写操作的优先权大于读操作。 seqlock的实现思路是,用一个递增的整型数表明sequence。写操作进入临界区时,sequence++;退出临界区时,sequence再++。写操作还需要获得一个锁(列如mutex),这个锁仅用于写写互斥,以保证同一时间最多只有一个正在进行的写操作。 当sequence为奇数时,表明有写操作正在进行,这时读操作要进入临界区需要等待,直到sequence变为偶数。读操作进入临界区时,需要记录下当前sequence的值,等它退出临界区的时候用记录的sequence与当前sequence做比较,不相等则表明在读操作进入临界区期间发生了写操作,这时候读操作读到的东西是无效的,需要返回重试。 seqlock写写是必须要互斥的。但是seqlock的应用场景本身就是读多写少的情况,写冲突的概率是很低的。所以这里的写写互斥基本上不会有什么性能损失。 而读写操作是不需要互斥的。seqlock的应用场景是写操作优先于读操作,对于写操作来说,几乎是没有阻塞的(除非发生写写冲突这一小概率事件),只需要做sequence++这一附加动作。而读操作也不需要阻塞,只是当发现读写冲突时需要retry。 seqlock的一个典型应用是时钟的更新,系统中每1毫秒会有一个时钟中断,相应的中断处理程序会更新时钟(写操作)。而用户程序可以调用gettimeofday之类的系统调用来获取当前时间(读操作)。在这种情况下,使用seqlock可以避免过多的gettimeofday系统调用把中断处理程序给阻塞了(如果使用读写锁,而不用seqlock的话就会这样)。中断处理程序总是优先的,而如果gettimeofday系统调用与之冲突了,那用户程序多等等也无妨。 seqlock的实现超级简单: 写操作进入临界区时:
void write_seqlock(seqlock_t *sl)
{
spin_lock(&sl->lock); // 上写写互斥锁
++sl->sequence; // sequence++
}写操作退出临界区时:
void write_sequnlock(seqlock_t *sl)
{
sl->sequence++; // sequence再++
spin_unlock(&sl->lock); // 释放写写互斥锁
}
读操作进入临界区时: unsigned read_seqbegin(const seqlock_t *sl)
{
unsigned ret;
repeat:
ret = sl->sequence; // 读sequence值
if (unlikely(ret & 1)) { // 如果sequence为奇数自旋等待
goto repeat;
}
return ret;
}读操作尝试退出临界区时:
int read_seqretry(const seqlock_t *sl, unsigned start)
{
return (sl->sequence != start); //看看sequence与进入临界区时是否发生过改变
}而读操作一般会这样进行:
do {
seq = read_seqbegin(&seq_lock);// 进入临界区
do_something();
} while (read_seqretry(&seq_lock, seq)); // 尝试退出临界区,存在冲突则重试场景:某服务需要支持海量用户,在一台物理机器上运行了多个进程/线程。对于数据应该如何处理以保证安全快速的访问数据呢?
解决方案:“分”
分号段
分进程
分端口
分库分表

场景:
网络接入进程与逻辑处理进程通过共享内存通讯。我们要如何进行设计?
一般的实现:读写加锁
示例:无锁内存队列的实现

append_data(srcbuf, buflen)
int usedSize = (m_head->size + m_head->endPos – m_head->beginPos) % m_head->size;
int leftSize = m_head->size – usedSize;
if (leftSize < buflen) return -1;
_copy_to_queue();
Int newpos = ….;
m_head->endPos = newpos;
take_data(dstbuf, buflen)
if (m_head->beginPos == m_head->endPos) return -1;
_copy_from_queue();
_do_copy_data();
int newpos = ….;
m_head->beginPos = newpos;场景:
1、 进程需要可以动态加载配置,我们需要怎么做?
2、 进一步,如果配置超级复杂,各个配置具有必定的依赖性,配置检查错误的话加载配置将会失败。我们如何设计才能安全、动态、无锁的加载配置?
一般的实现:
重启进程
发送信号
创建管理端口
示例1:直接访问共享内存

示例2:双配置缓冲区的实现

代码:
cfg_t* get_cur_cfg
return cfg_list + cur_idx;
load_cfg
int another = 1 – cur_idx;
_load_cfg(cfg_list + another);
cur_idx = another;