Nginx、HAProxy、LVS三者的优缺点
一、Nginx优点: 1、工作在网络7层之上,可针对http应用做一些分流的策略,如针对域名、目录结构,它的正规规则比HAProxy更为强大和灵活,所以,目前为止广泛流行。 2、Nginx对网络稳定性的依赖非常小,理论上能ping通就能进行负载功能。 3、Nginx安装与配置比较简单,测试也比较方便,基本能把错误日志打印出来。 4、可以承担高负载压力且稳定,硬件不差的情况下一般能支撑几万次的并发量,负载度比LVS小。 5、Nginx可以通过端口检测到服务器内部的故障,如根据服务器处理网页返回的状态码、超时等,并会把返回错误的请求重新提交到另一个节点。 6、不仅仅是优秀的负载均衡器/反向代理软件,同时也是强大的Web应用服务器。LNMP也是近些年非常流行的Web架构,在高流量环境中稳定性也很好。 7、可作为中层反向代理使用。 8、可作为静态网页和图片服务器。 9、Nginx社区活跃,第三方模块非常多,相关的资料在网上比比皆是。 Nginx常规的和HTTP请求和相应流程图: Nginx缺点: 1、适应范围较小,仅能支持http、https、Email协议。 2、对后端服务器的健康检查,只支持通过端口检测,不支持url来检测。比如用户正在上传一个文件,而处理该上传的节点刚好在上传过程中出现故障,Nginx会把上传切到另一台服务器重新处理,而LVS就直接断掉了,如果是上传一个很大的文件或者很重要的文件的话,用户可能会因此而不满。 二、LVS优点: 1、抗负载能力强、是工作在网络4层之上仅作分发之用,没有流量的产生,这个特点也决定了它在负载均衡软件里的性能最强的,对内存和cpu资源消耗比较低。 2、配置性比较低,这是一个缺点也是一个优点,因为没有可太多配置的东西,所以并不需要太多接触,大大减少了人为出错的几率。 3、工作稳定,因为其本身抗负载能力很强,自身有完整的双机热备方案,如LVS+Keepalived,不过我们在项目实施中用得最多的还是LVS/DR+Keepalived。 4、无流量,LVS只分发请求,而流量并不从它本身出去,这点保证了均衡器IO的性能不会收到大流量的影响。 5、应用范围比较广,因为LVS工作在4层,所以它几乎可以对所有应用做负载均衡,包括http、数据库、在线聊天室等等。 LVS DR(Direct Routing)模式的网络流程图: LVS的缺点: 1、软件本身不支持正则表达式处理,不能做动静分离;而现在许多网站在这方面都有较强的需求,这个是Nginx/HAProxy+Keepalived的优势所在。 2、如果是网站应用比较庞大的话,LVS/DR+Keepalived实施起来就比较复杂了,特别后面有Windows Server的机器的话,如果实施及配置还有维护过程就比较复杂了,相对而言,Nginx/HAProxy+Keepalived就简单多了。 三、HAProxy优点: 1、HAProxy是支持虚拟主机的,可以工作在4、7层(支持多网段) 2、HAProxy的优点能够补充Nginx的一些缺点,比如支持Session的保持,Cookie的引导;同时支持通过获取指定的url来检测后端服务器的状态。 3、HAProxy跟LVS类似,本身就只是一款负载均衡软件;单纯从效率上来讲HAProxy会比Nginx有更出色的负载均衡速度,在并发处理上也是优于Nginx的。 4、HAProxy支持TCP协议的负载均衡转发,可以对MySQL读进行负载均衡,对后端的MySQL节点进行检测和负载均衡,大家可以用LVS+Keepalived对MySQL主从做负载均衡。 5、HAProxy负载均衡策略非常多,HAProxy的负载均衡算法现在具体有如下8种 ① roundrobin 表示简单的轮询,每个服务器根据权重轮流使用,在服务器的处理时间平均分配的情况下这是最流畅和公平的算法。该算法是动态的,对于实例启动慢的服务器权重会在运行中调整。最大支持4095个后端主机; ② leastconn 连接数最少的服务器优先接收连接。leastconn建议用于长会话服务,例如LDAP、SQL、TSE等,而不适合短会话协议。如HTTP.该算法是动态的,对于实例启动慢的服务器权重会在运行中调整。 ③ static-rr 每个服务器根据权重轮流使用,类似roundrobin,但它是静态的,意味着运行时修改权限是无效的。另外,它对服务器的数量没有限制。该算法一般不用; ④ source 对请求源IP地址进行哈希,用可用服务器的权重总数除以哈希值,根据结果进行分配。只要服务器正常,同一个客户端IP地址总是访问同一个服务器。如果哈希的结果随可用服务器数量而变化,那么客户端会定向到不同的服务器;该算法一般用于不能插入cookie的Tcp模式。它还可以用于广域网上为拒绝使用会话cookie的客户端提供最有效的粘连;该算法默认是静态的,所以运行时修改服务器的权重是无效的,但是算法会根据“hash-type”的变化做调整。 ⑤ uri 表示根据请求的URI左端(问号之前)进行哈希,用可用服务器的权重总数除以哈希值,根据结果进行分配。只要服务器正常,同一个URI地址总是访问同一个服务器。一般用于代理缓存和反病毒代理,以最大限度的提高缓存的命中率。该算法只能用于HTTP后端;该算法一般用于后端是缓存服务器;该算法默认是静态的,所以运行时修改服务器的权重是无效的,但是算法会根据“hash-type”的变化做调整。 ⑥ url_param 在HTTP GET请求的查询串中查找<param>中指定的URL参数,基本上可以锁定使用特制的URL到特定的负载均衡器节点的要求;该算法一般用于将同一个用户的信息发送到同一个后端服务器;该算法默认是静态的,所以运行时修改服务器的权重是无效的,但是算法会根据“hash-type”的变化做调整。 ⑦ hdr(name) 在每个HTTP请求中查找HTTP头<name>,HTTP头<name>将被看作在每个HTTP请求,并针对特定的节点;如果缺少头或者头没有任何值,则用roundrobin代替;该算法默认是静态的,所以运行时修改服务器的权重是无效的,但是算法会根据“hash-type”的变化做调整。 ⑧ rdp-cookie(name) 为每个进来的TCP请求查询并哈希RDP cookie<name>;该机制用于退化的持久模式,可以使同一个用户或者同一个会话ID总是发送给同一台服务器。如果没有cookie,则使用roundrobin算法代替;该算法默认是静态的,所以运行时修改服务器的权重是无效的,但是算法会根据“hash-type”的变化做调整。 haproxy的工作模型图: HAPorxy缺点: 1. 不支持POP/SMTP协议 2. 不支持SPDY协议 3. 不支持HTTP cache功能。现在不少开源的lb项目,都或多或少具备HTTP cache功能。 4. 重载配置的功能需要重启进程,虽然也是soft restart,但没有Nginx的reaload更为平滑和友好。 5. 多进程模式支持不够好 ——————— 作者:qi_li_juan 来源:CSDN 原文:https://blog.csdn.net/qlj324513/article/details/81541282 版权声明:本文为博主原创文章,转载请附上博文链接!
View Detailsnginx实现请求的负载均衡 + keepalived实现nginx的高可用
前言 使用集群是网站解决高并发、海量数据问题的常用手段。当一台服务器的处理能力、存储空间不足时,不要企图去换更强大的服务器,对大型网站而言,不管多么强大的服务器,都满足不了网站持续增长的业务需求。这种情况下,更恰当的做法是增加一台服务器分担原有服务器的访问及存储压力。通过负载均衡调度服务器,将来自浏览器的访问请求分发到应用服务器集群中的任何一台服务器上,如果有更多的用户,就在集群中加入更多的应用服务器,使应用服务器的负载压力不再成为整个网站的瓶颈。 摘自《大型网站技术架构_核心原理与案例分析》 另外,大家可以看我的这两篇博客:LVS + keepalived + nginx + tomcat 实现主从热备 + 负载均衡 和 主从热备+负载均衡(LVS + keepalived),对比下这三篇博客,其中区别及各自的优缺点需要大家好好体会。 环境准备 192.168.0.221:nginx + keepalived master 192.168.0.222:nginx + keepalived backup 192.168.0.223:tomcat 192.168.0.224:tomcat 虚拟ip(VIP):192.168.0.200,对外提供服务的ip,也可称作浮动ip 各个组件之间的关系图如下: tomcat做应用服务器 tomcat的安装不在本博客范围之内,具体可参考virtualBox安装centos,并搭建tomcat,tomcat的webapps下记得放自己的应用,我的是myWeb,如果大家也用我的myWeb,那么index.jsp中的ip需要换成自己的 将192.168.0.223、192.168.0.224上的tomcat启动起来,tomcat的路径可能和我的不一致,需要写成自己的 # cd /usr/local/tomcat7/bin # ./startup.sh 访问myWeb如下 nginx做负载均衡 nginx的安装,本文就不讲述了,具体可参考LVS + keepalived + nginx + tomcat 实现主从热备 + 负载均衡 nginx.conf内容如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
user root; #运行用户 worker_processes 1; #启动进程,通常设置成和cpu的数量相等 #全局错误日志及PID文件 error_log /usr/local/nginx/logs/error.log; error_log /usr/local/nginx/logs/error.log notice; error_log /usr/local/nginx/logs/error.log info; pid /usr/local/nginx/logs/nginx.pid; # 工作模式及连接数上线 events { use epoll; #epoll是多路复用IO(I/O Multiplexing)中的一种方式,但是仅用于linux2.6以上内核,可以大大提高nginx的性能 worker_connections 1024; #单个后台worker process进程的最大并发链接数 } #设定http服务器,利用它的反向代理功能提供负载均衡支持 http { include mime.types; default_type application/octet-stream; #设定请求缓冲 server_names_hash_bucket_size 128; client_header_buffer_size 32K; large_client_header_buffers 4 32k; # client_max_body_size 8m; #sendfile 指令指定 nginx 是否调用 sendfile 函数(zero copy 方式)来输出文件,对于普通应用, #必须设为 on,如果用来进行下载等应用磁盘IO重负载应用,可设置为 off,以平衡磁盘与网络I/O处理速度,降低系统的uptime. sendfile on; tcp_nopush on; tcp_nodelay on; #连接超时时间 keepalive_timeout 65; #开启gzip压缩,降低传输流量 gzip on; gzip_min_length 1k; gzip_buffers 4 16k; gzip_http_version 1.1; gzip_comp_level 2; gzip_types text/plain application/x-javascript text/css application/xml; gzip_vary on; #添加tomcat列表,真实应用服务器都放在这 upstream tomcat_pool { #server tomcat地址:端口号 weight表示权值,权值越大,被分配的几率越大; server 192.168.0.223:8080 weight=4 max_fails=2 fail_timeout=30s; server 192.168.0.224:8080 weight=4 max_fails=2 fail_timeout=30s; } server { listen 80; #监听端口 server_name localhost; #默认请求设置 location / { proxy_pass http://tomcat_pool; #转向tomcat处理 } #所有的jsp页面均由tomcat处理 location ~ \.(jsp|jspx|dp)?$ { proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_pass http://tomcat_pool; #转向tomcat处理 } #所有的静态文件直接读取不经过tomcat,nginx自己处理 location ~ .*\.(htm|html|gif|jpg|jpeg|png|bmp|swf|ioc|rar|zip|txt|flv|mid|doc|ppt|pdf|xls|mp3|wma)$ { expires 30d; } location ~ .*\.(js|css)?$ { expires 1h; } #定义错误提示页面 error_page 500 502 503 504 /50x.html; location = /50x.html { root html; } } } |
主从nginx的配置文件完全一样,nginx.conf配置可复杂可简单,大家根据自己的情况自行配置,照搬上述配置也是可以的。 配置好后,启动nginx,路径要写自己的 # cd /usr/local/nginx/sbin # ./nginx 访问nginx,效果如下: 两台nginx服务器服务正常,此时是没有主从之分的,两者级别一样高,当配置keepalived之后就有了主从之分了。 keepalived实现nginx高可用(HA) keepalived的安装本文就不讲述了,具体可参考主从热备+负载均衡(LVS + keepalived) keepalived作用其实在第一张图中已经有所体现,主要起到两个作用:实现VIP到本地ip的映射; 以及检测nginx状态。 master上的keepalived.conf内容如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
global_defs { notification_email { 997914490@qq.com } notification_email_from sns-lvs@gmail.com smtp_server smtp.hysec.com smtp_connection_timeout 30 router_id nginx_master # 设置nginx master的id,在一个网络应该是唯一的 } vrrp_script chk_http_port { script "/usr/local/src/check_nginx_pid.sh" #最后手动执行下此脚本,以确保此脚本能够正常执行 interval 2 #(检测脚本执行的间隔,单位是秒) weight 2 } vrrp_instance VI_1 { state MASTER # 指定keepalived的角色,MASTER为主,BACKUP为备 interface eth0 # 当前进行vrrp通讯的网络接口卡(当前centos的网卡) virtual_router_id 66 # 虚拟路由编号,主从要一直 priority 100 # 优先级,数值越大,获取处理请求的优先级越高 advert_int 1 # 检查间隔,默认为1s(vrrp组播周期秒数) authentication { auth_type PASS auth_pass 1111 } track_script { chk_http_port #(调用检测脚本) } virtual_ipaddress { 192.168.0.200 # 定义虚拟ip(VIP),可多设,每行一个 } } |
backup上的keepalived.conf内容如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
global_defs { notification_email { 997914490@qq.com } notification_email_from sns-lvs@gmail.com smtp_server smtp.hysec.com smtp_connection_timeout 30 router_id nginx_backup # 设置nginx backup的id,在一个网络应该是唯一的 } vrrp_script chk_http_port { script "/usr/local/src/check_nginx_pid.sh" interval 2 #(检测脚本执行的间隔) weight 2 } vrrp_instance VI_1 { state BACKUP # 指定keepalived的角色,MASTER为主,BACKUP为备 interface eth0 # 当前进行vrrp通讯的网络接口卡(当前centos的网卡) virtual_router_id 66 # 虚拟路由编号,主从要一直 priority 99 # 优先级,数值越大,获取处理请求的优先级越高 advert_int 1 # 检查间隔,默认为1s(vrrp组播周期秒数) authentication { auth_type PASS auth_pass 1111 } track_script { chk_http_port #(调用检测脚本) } virtual_ipaddress { 192.168.0.200 # 定义虚拟ip(VIP),可多设,每行一个 } } |
nginx检测脚本check_nginx_pid.sh内容如下:
1 2 3 4 5 6 7 8 9 10 11 12 |
#!/bin/bash A=`ps -C nginx --no-header |wc -l` if [ $A -eq 0 ];then /usr/local/nginx/sbin/nginx #重启nginx if [ `ps -C nginx --no-header |wc -l` -eq 0 ];then #nginx重启失败 exit 1 else exit 0 fi else exit 0 fi |
启动keepalived # service keepalived start 访问VIP,效果如下: 我们来看下keepalived的日志信息 master(192.168.0.221): backup(192.168.0.222): 当我们把master上的keepalived停掉(模拟宕机),再来看下keepalived日志 原master(192.168.0.221): 原backup(192.168.0.222): 通过VIP可以正常访问服务,前端请求感受不到后端nginx的切换;重新唤醒原master(192.168.0.221)的测试这里就不进行了,大家自行测试 注意点 1、执行脚本时报错:/bin/sh^M: bad […]
View Detailskeepalived实现双机热备
keepalived的作用是检测后端TCP服务的状态,如果有一台提供TCP服务的后端节点死机,或者工作出现故障,keepalived会及时检测到,并将有故障的节点从系统中剔除,当提供TCP服务的节点恢复并且正常提供服务后keepalived会自动将TCP服务的节点加入到集群中。这些工作都是keepalived自动完成,不需要人工干涉,需要人工做的只是修复发生故障的服务器,以下通过示例来演示。 前提:为了测试能顺利进行,需先关闭selinux和firewalld。 测试环境如下: 1 2 3 4 5 keepalived主机: 10.0.0.20 keepalived备机: 10.0.0.21 http服务器1: 10.0.0.22 http服务器2: 10.0.0.23 VIP : 10.0.0.100 一、两台http服务器的安装 1、 两台机均安装httpd 1 $ sudo yum install -y httpd 2、 添加首页 1 2 3 4 5 6 7 $ sudo -i #http服务器1设置 # echo “10.0.0.22” >/var/www/html/index.html #http服务器2设置 # echo “10.0.0.23” >/var/www/html/index.html 3、 启动并设置开机启动httpd 1 2 $ sudo systemctl start httpd $ sudo systemctl enable httpd 二、两台keepalived主机的设置 1、 两台机均安装keepalived 1 2 3 #安装依赖文件与keepalive $ sudo yum install -y openssl openssl-devel keepalived 2、 keepalived主机配置 1 2 3 4 […]
View Details详解keepalived配置和使用
原创作品,允许转载,转载时请务必以超链接形式标明文章 原始出处 、作者信息和本声明。否则将追究法律责任。http://lanlian.blog.51cto.com/6790106/1303195 一、keepalived简介: keepalived是一个类似于layer3, 4 & 5交换机制的软件,也就是我们平时说的第3层、第4层和第5层交换。Keepalived的作用是检测web服务器的状态,如果有一台web服务器死机,或工作出现故障,Keepalived将检测到,并将有故障的web服务器从系统中剔除,当web服务器工作正常后Keepalived自动将web服务器加入到服务器群中,这些工作全部自动完成,不需要人工干涉,需要人工做的只是修复故障的web服务器。 工作原理 Layer3,4&5工作在IP/TCP协议栈的IP层,TCP层,及应用层,原理分别如下: Layer3:Keepalived使用Layer3的方式工作式时,Keepalived会定期向服务器群中的服务器发送一个ICMP的数据包(既我们平时用的Ping程序),如果发现某台服务的IP地址没有激活,Keepalived便报告这台服务器失效,并将它从服务器群中剔除,这种情况的典型例子是某台服务器被非法关机。Layer3的方式是以服务器的IP地址是否有效作为服务器工作正常与否的标准。 Layer4:如果您理解了Layer3的方式,Layer4就容易了。Layer4主要以TCP端口的状态来决定服务器工作正常与否。如web server的服务端口一般是80,如果Keepalived检测到80端口没有启动,则Keepalived将把这台服务器从服务器群中剔除。 Layer5:Layer5就是工作在具体的应用层了,比Layer3,Layer4要复杂一点,在网络上占用的带宽也要大一些。Keepalived将根据用户的设定检查服务器程序的运行是否正常,如果与用户的设定不相符,则Keepalived将把服务器从服务器群中剔除。 二、实验步骤: 1.创建管理节点在node1上,建立双机互信node1和node2,然后同步时间,安装keepalived 1 2 3 4 [root@node1~]# ansible all -m yum -a 'name=keepalived state=present' [root@node1keepalived]# rpm -qc keepalived /etc/keepalived/keepalived.conf//生成的主配置文件 /etc/sysconfig/keepalived 2.在node1上配置文件需要做一下修改 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 global_defs{ notification_email { root@localhost //收邮件人,可以定义多个 } notification_email_from kaadmin@localhost //发邮件人可以伪装 smtp_server 127.0.0.1 //发送邮件的服务器地址 smtp_connect_timeout 30 //连接超时时间 router_id LVS_DEVEL } vrrp_instanceVI_1 { //每一个vrrp_instance就是定义一个虚拟路由器的 state MASTER //由初始状态状态转换为master状态 interface eth0 virtual_router_id 51 //虚拟路由的id号,一般不能大于255的 priority 100 //初始化优先级 advert_int 1 //初始化通告 authentication { //认证机制 auth_type […]
View Details用rabbitMq解决web高并发的学习笔记
引入pom.xml
1 2 3 4 5 6 7 |
<!-- https://docs.spring.io/spring-amqp/docs/2.0.2.RELEASE/reference/html/ https://docs.spring.io/spring-boot/docs/current/reference/html/boot-features-messaging.html#boot-features-amqp --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> |
application.yml 配置部分
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
#rabbitmq rabbitmq: host: 10.0.0.2 port: 5672 username: springboot password: password publisher-confirms: true publisher-returns: true template: mandatory: true #https://github.com/spring-projects/spring-boot/blob/v2.0.5.RELEASE/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java listener: concurrency: 2 #最小消息监听线程数 max-concurrency: 2 #最大消息监听线程数 mybatis: mapper-locations: classpath:mapping/*.xml ··· |
创建配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 |
package com.redis; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; //错误的 import com.rabbitmq.client.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Scope; /** * * Broker:它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输, * Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。 Queue:消息的载体,每个消息都会被投到一个或多个队列。 * * Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来. * * Routing Key:路由关键字, * * exchange根据这个关键字进行消息投递。 vhost:虚拟主机,一个broker里可以有多个vhost,用作不同用户的权限分离。 * * Producer:消息生产者,就是投递消息的程序. Consumer:消息消费者,就是接受消息的程序. * Channel:消息通道,在客户端的每个连接里,可建立多个channel. * * */ @Configuration public class RabbitConfig { private final Logger logger = LoggerFactory.getLogger(this.getClass()); @Value("${spring.rabbitmq.host}") private String host; @Value("${spring.rabbitmq.port}") private int port; @Value("${spring.rabbitmq.username}") private String username; @Value("${spring.rabbitmq.password}") private String password; public static final String EXCHANGE_A = "my-mq-exchange_A"; public static final String QUEUE_A = "QUEUE_A"; public static final String ROUTINGKEY_A = "spring-boot-routingKey_A"; @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost("/"); connectionFactory.setPublisherConfirms(true); return connectionFactory; } @Bean @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) // 必须是prototype类型 public RabbitTemplate rabbitTemplate() { RabbitTemplate template = new RabbitTemplate(connectionFactory()); return template; } /** * * 针对消费者配置 * * 1. 设置交换机类型 * * 2. 将队列绑定到交换机 * * FanoutExchange: * 将消息分发到所有的绑定队列,无routingkey的概念 * * HeadersExchange :通过添加属性key-value匹配 * DirectExchange:按照routingkey分发到指定队列 * TopicExchange:多关键字匹配 * * 如果需要使用的其他的交换器类型,spring中都已提供实现,所有的交换器均实现org.springframework.amqp.core.AbstractExchange接口。 常用交换器类型如下: Direct(DirectExchange):direct 类型的行为是"先匹配, 再投送". 即在绑定时设定一个 routing_key, 消息的routing_key完全匹配时, 才会被交换器投送到绑定的队列中去。 Topic(TopicExchange):按规则转发消息(最灵活)。 Headers(HeadersExchange):设置header attribute参数类型的交换机。 Fanout(FanoutExchange):转发消息到所有绑定队列。 * */ @Bean public DirectExchange defaultExchange() { return new DirectExchange(EXCHANGE_A); } /** * 获取队列A * * @return */ @Bean public Queue queueA() { return new Queue(QUEUE_A, true); // 队列持久 } /** * 一个交换机可以绑定多个消息队列,也就是消息通过一个交换机,可以分发到不同的队列当中去。 * * @return */ @Bean public Binding binding() { return BindingBuilder.bind(queueA()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_A); } } |
Producer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
package com.redis; import java.util.UUID; import org.springframework.amqp.core.Message; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback; @Component public class MsgProducer implements RabbitTemplate.ConfirmCallback , ReturnCallback{ private final Logger logger = LoggerFactory.getLogger(this.getClass()); // 由于rabbitTemplate的scope属性设置为ConfigurableBeanFactory.SCOPE_PROTOTYPE,所以不能自动注入 private RabbitTemplate rabbitTemplate; /** * 构造方法注入rabbitTemplate */ @Autowired public MsgProducer(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; rabbitTemplate.setConfirmCallback(this); // rabbitTemplate如果为单例的话,那回调就是最后设置的内容 /** * ConfirmCallback接口用于实现消息发送到RabbitMQ交换器后接收ack回调。 * ReturnCallback接口用于实现消息发送到RabbitMQ交换器,但无相应队列与交换器绑定时的回调。 */ rabbitTemplate.setReturnCallback(this); } @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println(message.getMessageProperties().getCorrelationIdString() + " 发送失败"); System.out.println("消息主体 message : "+message); System.out.println("消息主体 message : "+replyCode); System.out.println("描述:"+replyText); System.out.println("消息使用的交换器 exchange : "+exchange); System.out.println("消息使用的路由键 routing : "+routingKey); } /** * rabbitTemplate.send(message); //发消息,参数类型为org.springframework.amqp.core.Message rabbitTemplate.convertAndSend(object); //转换并发送消息。 将参数对象转换为org.springframework.amqp.core.Message后发送 rabbitTemplate.convertSendAndReceive(message) //转换并发送消息,且等待消息者返回响应消息。 * @param content */ public void sendMsg(String content) { CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString()); System.out.println("开始发送消息c : " + content.toLowerCase() + " ,correlationId= " + correlationId); String response = rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_A, RabbitConfig.ROUTINGKEY_A, content, correlationId).toString(); System.out.println("结束发送消息c : " + content.toLowerCase()); System.out.println("消费者响应c : " + response + " 消息处理完成"); //logger.info(" 发送消息TO A:" + content); // 把消息放入ROUTINGKEY_A对应的队列当中去,对应的是队列A //rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_A, RabbitConfig.ROUTINGKEY_A, content, correlationId); } /** * 回调 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { logger.info(" 回调id:" + correlationData); if (ack) { logger.info("消息成功消费"); } else { logger.info("消息消费失败:" + cause); } } } |
Receiver
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
package com.redis; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @RabbitListener(queues = RabbitConfig.QUEUE_A) public class MsgReceiver { private final Logger logger = LoggerFactory.getLogger(this.getClass()); @RabbitHandler public void process(String content) { System.out.println("接收处理队列A当中的消息: " + content); } } |
unit test
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
import java.util.Date; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import com.redis.MsgProducer; @RunWith(SpringRunner.class) @SpringBootTest public class RabbitMqTest { @Autowired private MsgProducer sender; @Test public void sendTest() throws Exception { //while(true){ String msg = new Date().toString(); sender.sendMsg(msg); Thread.sleep(6000); //} } } |
可以测试通过 image.png 高并发的访问量, 除了使用nginx/haproxy本身实现外, 尝试了google guava 简单好用, 来控制前端用户请求量, 范例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
package com.test.ratelimit; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import com.google.common.util.concurrent.RateLimiter; public class ComplexDemo { private static RateLimiter rateLimiter = RateLimiter.create(10); private static AtomicInteger suc = new AtomicInteger(0), fail = new AtomicInteger(0); public static void main(String[] args) { // TODO Auto-generated method stub List<Runnable> tasks = new ArrayList<Runnable>(); for (int i = 0; i < 100; i++) { tasks.add(new UserRequest(i)); } ExecutorService threadPool = Executors.newCachedThreadPool(); for (Runnable runnable : tasks) { threadPool.execute(runnable); } } private static boolean startGo(int i) { //基于令牌桶算法的限流实现类 /** * 一秒出10个令牌,0.1秒出一个,100个请求进来,假如100个是同时到达, 那么最终只能成交10个,90个都会因为超时而失败。 * */ /** * tryAcquire(long timeout, TimeUnit unit) * 从RateLimiter 获取许可如果该许可可以在不超过timeout的时间内获取得到的话, * 或者如果无法在timeout 过期之前获取得到许可的话,那么立即返回false(无需等待) */ //判断能否在1秒内得到令牌,如果不能则立即返回false,不会阻塞程序 if (!rateLimiter.tryAcquire(1000, TimeUnit.MILLISECONDS)) { System.out.println("暂时无法获取令牌, 排队失败" + i); fail.getAndIncrement(); System.out.println("SUC/FAIL=" + suc.get() + "/" + fail.get()); return false; } if (update() > 0) { System.out.println("成功" + i); suc.getAndIncrement(); System.out.println("FAIL/SUC=" + fail.get() + "/" + suc.get()); return true; } System.out.println("数据不足,失败"); return false; } private static int update() { return 1; } private static class UserRequest implements Runnable { private int id; public UserRequest(int id) { this.id = id; } public void run() { startGo(id) ; } } } 测试结果: ... 成功8 FAIL/SUC=89/10 成功7 FAIL/SUC=89/11 |
总结, rabbit mq, 这里只用Direct。 Topic匹配灵活, 可以用到其他场景。 REF: http://www.rabbitmq.com/install-rpm.html 作者:DONG999 链接:https://www.jianshu.com/p/f621b47c80c3 来源:简书 简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。
View Details服务级高并发秒杀优化(RabbitMQ+接口优化)
1. 安装RabbitMQ linux下的安装没什么可说的,因为本机懒得重装虚拟机了,所以就下载了windows版本进行安装。 erlang下载地址:http://www.erlang.org/download.html rabbitMQ下载:http://www.rabbitmq.com/download.html 直接下载安装即可。 因为想用可视化界面监控消息,所以先激活这个功能。
1 2 |
//到rabbitMQ安装目录的sbin目录下启动cmd黑窗口 E:\software\RabbitMQServer\rabbitmq_server-3.6.5\sbin>rabbitmq-plugins.bat enable rabbitmq_management |
然后重启rabbitMQ服务。输入网址:http://localhost:15672/。 使用默认用户guest/guest进入网页端控制台。 2. rabbitMQ基本原理和使用 rabbitMQ原理 Broker:简单来说就是消息队列服务器实体。 Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。 Queue:消息队列载体,每个消息都会被投入到一个或多个队列。 Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。 Routing Key:路由关键字,exchange根据这个关键字进行消息投递。 vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。 producer:消息生产者,就是投递消息的程序。 consumer:消息消费者,就是接受消息的程序。 channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。 消息队列的使用过程大概如下 客户端连接到消息队列服务器,打开一个channel。 客户端声明一个exchange,并设置相关属性。 客户端声明一个queue,并设置相关属性。 客户端使用routing key,在exchange和queue之间建立好绑定关系。 客户端投递消息到exchange。 总结:exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。 Direct交换机 完全根据key进行投递的叫做Direct交换机,例如,绑定时设置了routing key为”abc”,那么客户端提交的消息,只有设置了key为”abc”的才会投递到队列。 所有发送到Direct Exchange的消息被转发到RouteKey中指定的Queue。 Direct模式,可以使用rabbitMQ自带的Exchange:default Exchange 。所以不需要将Exchange进行任何绑定(binding)操作 。消息传递时,RouteKey必须完全匹配,才会被队列接收,否则该消息会被抛弃。 Topic交换机 对key进行模式匹配后进行投递的叫做Topic交换机。*(星号)可以代替一个任意标识符 ;#(井号)可以代替零个或多个标识符。 在上图例子中,我们发送描述动物的消息。消息会转发给包含3个单词(2个小数点)的路由键绑定的队列中。绑定键中的第一个单词描述的是速度,第二个是颜色,第三个是物种:“速度.颜色.物种”。 我们创建3个绑定:Q1绑定键是“.orange.”,Q2绑定键是“..rabbit”,Q3绑定键是“lazy.#”。这些绑定可以概括为:Q1只对橙色的动物感兴趣。Q2则是关注兔子和所有懒的动物。 所有发送到Topic Exchange的消息被转发到所有关心RouteKey中指定Topic的Queue上, 所有发送到Topic Exchange的消息被转发到所有关心RouteKey中指定Topic的Queue上, Exchange 将RouteKey 和某Topic 进行模糊匹配。此时队列需要绑定一个Topic。可以使用通配符进行模糊匹配,符号“#”匹配一个或多个词,符号“”匹配不多不少一个词。因此“log.#”能够匹配到“log.info.oa”,但是“log.” 只会匹配到“log.error”。 Fanout交换机 还有一种不需要key的,叫做Fanout交换机,它采取广播模式,一个消息进来时,投递到与该交换机绑定的所有队列。 所有发送到Fanout Exchange的消息都会被转发到与该Exchange 绑定(Binding)的所有Queue上。 Fanout Exchange 不需要处理RouteKey 。只需要简单的将队列绑定到exchange 上。这样发送到exchange的消息都会被转发到与该交换机绑定的所有队列上。类似子网广播,每台子网内的主机都获得了一份复制的消息。 所以,Fanout Exchange 转发消息是最快的。 Headers交换机 首部交换机是忽略routing_key的一种路由方式。路由器和交换机路由的规则是通过Headers信息来交换的,这个有点像HTTP的Headers。将一个交换机声明成首部交换机,绑定一个队列的时候,定义一个Hash的数据结构,消息发送的时候,会携带一组hash数据结构的信息,当Hash的内容匹配上的时候,消息就会被写入队列。 绑定交换机和队列的时候,Hash结构中要求携带一个键“x-match”,这个键的Value可以是any或者all,这代表消息携带的Hash是需要全部匹配(all),还是仅匹配一个键(any)就可以了。相比直连交换机,首部交换机的优势是匹配的规则不被限定为字符串(string)。 持久化 RabbitMQ支持消息的持久化,也就是数据写在磁盘上,为了数据安全考虑,我想大多数用户都会选择持久化。消息队列持久化包括3个部分: – exchange持久化,在声明时指定durable => 1 – queue持久化,在声明时指定durable => 1 – 消息持久化,在投递时指定delivery_mode => 2(1是非持久化) 如果exchange和queue都是持久化的,那么它们之间的binding也是持久化的。如果exchange和queue两者之间有一个持久化,一个非持久化,就不允许建立绑定。 3. rabbitMQ-Direct交换机 […]
View Details.Net高并发解决思路
首先在windows上安装好Redis,RabbitMQ Redis-cli使用示例 ModelContext.cs代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
public class ModelContext : DbContext { //您的上下文已配置为从您的应用程序的配置文件(App.config 或 Web.config) //连接字符串。 public ModelContext() : base("name=default") { } public virtual DbSet<Person> Person { get; set; } } public class Person { public int Id { get; set; } public string Id2 { get; set; } public string Name { get; set; } } |
在 Package Manager Console 下运行命令 Enable-Migrations 这个命令将在项目下创建文件夹 Migrations The Configuration class 这个类允许你去配置如何迁移,对于本文将使用默认的配置(在本文中因为只有一个 Context, Enable-Migrations 将自动对 context type 作出适配); An InitialCreate migration (本文为201702220232375_20170222.cs)这个迁移之所以存在是因为我们之前用 Code First 创建了数据库, 在启用迁移前,scaffolded migration 里的代码表示在数据库中已经创建的对象,本文中即为表 Person(列 Id 和 Name). 文件名包含一个 timestamp 以便排序(如果之前数据库没有被创建,那么 InitialCreate migration 将不会被创建,相反,当我们第一次调用 Add-Migration 的时候所有表都将归集到一个新的 migration 中) 多个实体锁定同一数据库 当使用 EF6 之前的版本时,只会有一个 Code First Model 被用来生成/管理数据库的 Schema, 这将导致每个数据库只会有一张 __MigrationsHistory 表,从而无法辨别实体与模型的对应关系。 从 EF6 开始,Configuration 类将会包含一个 ContextKey 属性,它将作为每一个 Code First Model 的唯一标识符, __MigrationsHistory 表中一个相应地的列允许来自多个模型(multiple models)的实体共享表(entries),默认情况下这个属性被设置成 context 的完全限定名。 定制化迁移 在 Package Manager Console 中运行命令 Add-Migration XXXXXXXXX 生成的迁移如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
public partial class _20170222 : DbMigration { public override void Up() { CreateTable( "dbo.People", c => new { Id = c.Int(nullable: false, identity: true), Id2 = c.String(), Name = c.String(), }) .PrimaryKey(t => t.Id); } public override void Down() { DropTable("dbo.People"); } } |
Configuration.cs代码:
1 2 3 4 5 6 7 8 9 10 11 12 |
internal sealed class Configuration : DbMigrationsConfiguration<EF.ModelContext> { public Configuration() { AutomaticMigrationsEnabled = false; } protected override void Seed(EF.ModelContext context) { } } |
我们对迁移做些更改: […]
View DetailsRabbitMQ的几种典型使用场景
AMQP AMQP协议是一个高级抽象层消息通信协议,RabbitMQ是AMQP协议的实现。它主要包括以下组件: 1.Server(broker): 接受客户端连接,实现AMQP消息队列和路由功能的进程。 2.Virtual Host:其实是一个虚拟概念,类似于权限控制组,一个Virtual Host里面可以有若干个Exchange和Queue,但是权限控制的最小粒度是Virtual Host 3.Exchange:接受生产者发送的消息,并根据Binding规则将消息路由给服务器中的队列。ExchangeType决定了Exchange路由消息的行为,例如,在RabbitMQ中,ExchangeType有direct、Fanout和Topic三种,不同类型的Exchange路由的行为是不一样的。 4.Message Queue:消息队列,用于存储还未被消费者消费的消息。 5.Message: 由Header和Body组成,Header是由生产者添加的各种属性的集合,包括Message是否被持久化、由哪个Message Queue接受、优先级是多少等。而Body是真正需要传输的APP数据。 6.Binding:Binding联系了Exchange与Message Queue。Exchange在与多个Message Queue发生Binding后会生成一张路由表,路由表中存储着Message Queue所需消息的限制条件即Binding Key。当Exchange收到Message时会解析其Header得到Routing Key,Exchange根据Routing Key与Exchange Type将Message路由到Message Queue。Binding Key由Consumer在Binding Exchange与Message Queue时指定,而Routing Key由Producer发送Message时指定,两者的匹配方式由Exchange Type决定。 7.Connection:连接,对于RabbitMQ而言,其实就是一个位于客户端和Broker之间的TCP连接。 8.Channel:信道,仅仅创建了客户端到Broker之间的连接后,客户端还是不能发送消息的。需要为每一个Connection创建Channel,AMQP协议规定只有通过Channel才能执行AMQP的命令。一个Connection可以包含多个Channel。之所以需要Channel,是因为TCP连接的建立和释放都是十分昂贵的,如果一个客户端每一个线程都需要与Broker交互,如果每一个线程都建立一个TCP连接,暂且不考虑TCP连接是否浪费,就算操作系统也无法承受每秒建立如此多的TCP连接。RabbitMQ建议客户端线程之间不要共用Channel,至少要保证共用Channel的线程发送消息必须是串行的,但是建议尽量共用Connection。 9.Command:AMQP的命令,客户端通过Command完成与AMQP服务器的交互来实现自身的逻辑。例如在RabbitMQ中,客户端可以通过publish命令发送消息,txSelect开启一个事务,txCommit提交一个事务。 在了解了AMQP模型以后,需要简单介绍一下AMQP的协议栈,AMQP协议本身包括三层: 1.Module Layer,位于协议最高层,主要定义了一些供客户端调用的命令,客户端可以利用这些命令实现自己的业务逻辑,例如,客户端可以通过queue.declare声明一个队列,利用consume命令获取一个队列中的消息。 2.Session Layer,主要负责将客户端的命令发送给服务器,在将服务器端的应答返回给客户端,主要为客户端与服务器之间通信提供可靠性、同步机制和错误处理。 3.Transport Layer,主要传输二进制数据流,提供帧的处理、信道复用、错误检测和数据表示。 RabbitMQ使用场景 学习RabbitMQ的使用场景,来自官方教程:https://www.rabbitmq.com/getstarted.html 场景1:单发送单接收 使用场景:简单的发送与接收,没有特别的处理。 Producer:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; public class Send { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } } |
Consumer:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.QueueingConsumer; public class Recv { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); } } } |
场景2:单发送多接收 使用场景:一个发送端,多个接收端,如分布式的任务派发。为了保证消息发送的可靠性,不丢失消息,使消息持久化了。同时为了防止接收端在处理消息时down掉,只有在消息处理完成后才发送ack消息。 Producer:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.MessageProperties; public class NewTask { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); String message = getMessage(argv); channel.basicPublish( "", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } private static String getMessage(String[] strings){ if (strings.length < 1) return "Hello World!"; return joinStrings(strings, " "); } private static String joinStrings(String[] strings, String delimiter) { int length = strings.length; if (length == 0) return ""; StringBuilder words = new StringBuilder(strings[0]); for (int i = 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); } } |
发送端和场景1不同点: 1、使用“task_queue”声明了另一个Queue,因为RabbitMQ不容许声明2个相同名称、配置不同的Queue 2、使"task_queue"的Queue的durable的属性为true,即使消息队列durable 3、使用MessageProperties.PERSISTENT_TEXT_PLAIN使消息durable When RabbitMQ quits or crashes it will forget the queues and messages unless you tell it not to. Two things are required to make sure that messages aren’t lost: we need to mark both the queue […]
View Details高并发场景之RabbitMQ篇
上次我们介绍了在单机、集群下高并发场景可以选择的一些方案,传送门:高并发场景之一般解决方案 但是也发现了一些问题,比如集群下使用ConcurrentQueue或加锁都不能解决问题,后来采用Redis队列也不能完全解决问题, 因为使用Redis要自己实现分布式锁 这次我们来了解一下一个专门处理队列的组件:RabbitMQ,这个东西天生支持分布式队列。 下面我们来用RabbitMQ来实现上一篇的场景 一、新建RabbitMQ.Receive
1 2 |
private static ConnectionFactory factory = new ConnectionFactory { HostName = "192.168.1.109", UserName = "ljr", Password = "root", VirtualHost = "/" }; |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
1 static void Main(string[] args) 2 { 3 using (var connection = factory.CreateConnection()) 4 { 5 using (var channel = connection.CreateModel()) 6 { 7 var consumer = new EventingBasicConsumer(); 8 consumer.Received += (model, ea) => 9 { 10 var body = ea.Body; 11 var message = Encoding.UTF8.GetString(body); 12 Console.WriteLine(" [x] Received {0}", message); 13 14 var total = DbHelper.ExecuteScalar("Select Total from ConCurrency where Id = 1", null).ToString(); 15 var value = int.Parse(total) + 1; 16 17 DbHelper.ExecuteNonQuery(string.Format("Update ConCurrency Set Total = {0} where Id = 1", value.ToString()), null); 18 }; 19 20 channel.QueueDeclare(queue: "queueName", durable: false, exclusive: false, autoDelete: false, arguments: null); 21 channel.BasicConsume(queue: "queueName", noAck: true, consumer: consumer); 22 23 Console.WriteLine(" Press [enter] to exit."); 24 Console.ReadLine(); 25 } 26 } 27 } |
二、新建RabbitMQ.Send
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
1 static void Main(string[] args) 2 { 3 for (int i = 1; i <= 500; i++) 4 { 5 Task.Run(async () => 6 { 7 await Produce(); 8 }); 9 10 Console.WriteLine(i); 11 } 12 13 Console.ReadKey(); 14 } 15 16 public static Task Produce() 17 { 18 return Task.Factory.StartNew(() => 19 { 20 using (var connection = factory.CreateConnection()) 21 { 22 using (var channel = connection.CreateModel()) 23 { 24 var body = Encoding.UTF8.GetBytes(Guid.NewGuid().ToString()); 25 channel.QueueDeclare(queue: "queueName", durable: false, exclusive: false, autoDelete: false, arguments: null); 26 channel.BasicPublish(exchange: "", routingKey: "queueName", basicProperties: null, body: body); 27 } 28 } 29 }); 30 } |
这里是模拟500个用户请求,正常的话最后Total就等于500 我们来说试试看,运行程序 2.1、打开接收端 2.2 运行客户端 2.3、可以看到2边几乎是实时的,再去看看数据库 三、我们在集群里执行 最后数据是1000 完全没有冲突,好了,就是这样 。 from: https://www.cnblogs.com/lanxiaoke/p/6659548.html
View Details