烂笔头

不积跬步无以至千里

0%

1
2
3
192.168.172.201  控制节点
192.168.172.202 工作节点
192.168.172.203 工作节点

准备工作

关闭防火墙

1
2
systemctl stop firewalld
systemctl disable firewalld

关闭selinux

1
2
sed -i 's/enforcing/disabled/' /etc/selinux/config
setenforce 0 #临时关闭

关闭swap

1
2
sed -ri 's/.*swap.*/#&/' /etc/fstab
swapoff -a #临时关闭

根据规划设置主机名

1
2
3
4
5
6
7
8
9
10
hostnamectl set-hostname cluster01  #192.168.172.201
hostnamectl set-hostname cluster02 #192.168.172.202
hostnamectl set-hostname cluster03 #192.168.172.203

# 在master添加hosts
cat >> /etc/hosts << EOF
192.168.172.201 cluster01
192.168.172.202 cluster02
192.168.172.203 cluster03
EOF

将桥接的IPv4流量传递到iptables的链

1
2
3
4
5
6
7
cat > /etc/sysctl.d/k8s.conf << EOF
net.bridge.bridge-nf-call-ip6tables = 1
net.bridge.bridge-nf-call-iptables = 1
EOF

# 生效
sysctl --system

安装

添加kubernetes软件源

1
2
3
4
5
6
7
8
9
cat > /etc/yum.repos.d/kubernetes.repo << EOF
[kubernetes]
name=Kubernetes
baseurl=https://mirrors.aliyun.com/kubernetes/yum/repos/kubernetes-el7-x86_64
enabled=1
gpgcheck=0
repo_gpgcheck=0
gpgkey=https://mirrors.aliyun.com/kubernetes/yum/doc/yum-key.gpg https://mirrors.aliyun.com/kubernetes/yum/doc/rpm-package-key.gpg
EOF

安装kubeadm kubectl kubelet

1
2
yum install -y kubelet-1.18.0 kubeadm-1.18.0 kubectl-1.18.0
systemctl enable kubelet

部署 master

1
2
3
4
5
6
7
kubeadm init --apiserver-advertise-address=192.168.172.201 --image-repository registry.aliyuncs.com/google_containers --kubernetes-version v1.18.0 --service-cidr=10.96.0.0/12  --pod-network-cidr=10.244.0.0/16
# 按控制台输出执行
mkdir -p $HOME/.kube
sudo cp -i /etc/kubernetes/admin.conf $HOME/.kube/config
sudo chown $(id -u):$(id -g) $HOME/.kube/config
# 默认token有效期为24小时,当过期之后,该token就不可用了
kubeadm token create --print-join-command >> kube_join

加入工作节点

1
kubeadm join 192.168.172.201:6443 --token u3iqxs.b4rj1oboeoiwnxx4     --discovery-token-ca-cert-hash sha256:aeec473887aec0f48c9a5023d467febc8a859f3981c8ef04698e4ec2ae32273b

安装CNI插件

1
2
3
4
5
6
wget https://raw.githubusercontent.com/coreos/flannel/master/Documentation/kube-flannel.yml
#修改源
sed -i 's/quay.io/quay.mirrors.ustc.edu.cn/' kube-flannel.yml
kubectl apply -f ./kube-flannel.yml
#查看pods状态
kubectl get pods -n kube-system

测试

1
2
3
4
5
6
7
8
9
10
11
12
kubectl create deployment nginx --image=nginx

# 暴露端口
kubectl expose deployment nginx --port=80 --type=NodePort
# 查看一下对外的端口
[root@localhost ~]# kubectl get pod,svc
NAME READY STATUS RESTARTS AGE
pod/nginx-f89759699-wfmpn 1/1 Running 0 3m37s

NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
service/kubernetes ClusterIP 10.96.0.1 <none> 443/TCP 51m
service/nginx NodePort 10.101.146.235 <none> 80:30067/TCP 10s

访问http://192.168.172.20x:30067 可以看到nginx欢迎页面

基本实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* 自旋实现锁
*/
public class MyLock implements ILock {
private volatile int status = 0;

public void lock() {
while (!compareAndSwap(1)) {
System.out.println(Thread.currentThread().getName() + "waiting");
//v1 自旋+yeild 让出CPU,还是有很大几率被CPU重新调度,CPU消耗大
//Thread.yield();
//sleep + 自旋 让出CPU,但是sleep时长难以确定
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}

}
System.out.println(Thread.currentThread().getName() + "lock,");
}

private boolean compareAndSwap(int newValue) {
if (0 == status) {
status = newValue;
return true;
}
return false;
}

public void unlock() {
status = 0;
System.out.println(Thread.currentThread().getName() + "unlock");
}
}

park实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/**
* park+自旋
*/
public class ParkLock implements ILock {
private volatile int status = 0;
private Queue<Thread> queue = new ConcurrentLinkedQueue<>();

public void lock() {
while (!compareAndSwap(1)) {
System.out.println(Thread.currentThread().getName() + "waiting");
threadPark();
}
System.out.println(Thread.currentThread().getName() + "lock,");
}

private void threadPark() {
queue.add(Thread.currentThread());
status = 1;
LockSupport.park();
}

private boolean compareAndSwap(int newValue) {
if (0 == status) {
status = newValue;
return true;
}
return false;
}

public void unlock() {
status = 0;
Thread poll = queue.poll();
LockSupport.unpark(poll);
System.out.println(Thread.currentThread().getName() + "unlock");
}
}

测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class CountRun implements Runnable {
private static ParkLock lock = new ParkLock();
private final CountDownLatch latch;

public CountRun(CountDownLatch latch) {
this.latch = latch;
}

@Override
public void run() {
lock.lock();
for (int i = 0; i < 100; i++) {
CountMain.INIT_VALUE++;
try {
Thread.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
lock.unlock();
latch.countDown();
}
}

public class CountMain {
public static int INIT_VALUE = 0;
public static final int LIMIT = 3;

public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(LIMIT);
new Thread(new CountRun(countDownLatch)).start();
new Thread(new CountRun(countDownLatch)).start();
new Thread(new CountRun(countDownLatch)).start();
countDownLatch.await();
System.out.println(Thread.currentThread().getName() + ":" + CountMain.INIT_VALUE);
}
}

第一章

寄存器

CPU中可以存储数据的器件,一个CPU中包含多个寄存器。
MOV AX,BX
AX 是其中一个寄存器代号
BX 是其中另一个寄存器代号
它们有不同的作用,存放的东西也不同,有些存放数据,有些存放指令

汇编语言的组成

  1. 汇编指令(机器码的助记符)
  2. 伪指令(由编译器执行)
  3. 其他符号(由编译器识别)
    编译器将伪指令转换成逻辑运算由CPU执行,如+会转换成&|

    存储器

    CPU是计算机的核心部件,它控制整个计算机的运作并进行运算,要想让一个CPU工作,就必须向他提供指令和数据。
    指令和数据在寄存器中存放,也就是平时所说的内存
    • 存储单元
      每个存储器被划分为若干个存储单元,存储单元从0开始顺序编号
      如:
      一个存储器有128个存储单元,
      编号从0~127.
    • CPU要想进行数据读写,必须和外部芯片进行三类信息交互:
    1. 存储单元的地址(地址信息)
    2. 芯片的选择、读或写命令(控制信息)
    3. 读或写的数据(数据信息)
    • CPU通过地址总线来指定存储单元
      一个CPU有N根地址总线,则可以说这个CPU的地址总线宽度为N。
      这样的CPU最多可以寻找2的N次方个内存单元
    • 数据总线:CPU与内存或其他器件之间的数据传送是通过数据总线来进行的
      数据总线的宽度决定了CPU与外界的数据传送速度
    • 控制总线 CPU对外部器件的控制是通控制总线来进行的。
      有多少跟控制总线,就意味着CPU提供了多少种对外部器件的控制
      内存读写控制总线控制:
      其中一根名为读信号输出总线负责由CPU向外传送读信号,CPU向该控制线输出低电平表示要读取信息
      有一根名为写信号输出控制线负责由CPU向外发送写信号
      地址总线决定了CPU的寻址能力
      数据总线决定了CPU与其他器件进行数据传输时一次传送量
      控制总线决定了CPU对系统其他器件的控制能力

      内存地址空间

      一个CPU的地址总线宽度是10,那么可以寻址2^10个内存单元,这1024个可寻到的内存单元就构成这个CPU的内存地址空间
  • RAM 随机存储器
  • ROM 只读存储器

    第二章 寄存器(CPU内部工作原理)

    一个典型的CPU包括 运算器 、 控制器 、寄存器,他们通过CPU内部总线相连(区别于外部总线)
    8086CPU由14个寄存器:AX BX CX DX SI DI SP BP IP CS SS DS ES PSW
    8086CPU所有寄存器都是16位,可以存放2个字节
    AX BX CX DX 通常用来存放一般性的数据,被称为通用寄存器
    2^4 汇编指令
    mov al,0002H
    add al,al
    add al,al
    add al,al
    8086CPU有4个段寄存器:
    CS DS SS ES
    当要访问内存时,由这4个段寄存器提供内存单元的段地址
    物理地址=段地址 * 16 + 偏移地址
    CS 存放指令的段地址
    IP 存放指令偏移地址
    8086机中,任意时刻,CPU将CS:IP指向的内容当作指令执行
    jmp 2AE3:3 访问2AE30+0003 = 2AE33 的内存地址

仅修改IP:
jmp ax (类似mov IP,ax)会将ax内容放入IP中

寄存器(内存)

执行后的值寄存器ax,bx,cx的值
内存数据
| 10000H | 23 |
| 10001H | 11 |
| 10002H | 22 |
| 10003H | 66 |

mov ax,1000H ax=1000
mov ds,ax ds=1000
mov ax,[0] ax=1123
mov bx,[2] bx=6622
mov cx,[1] cx=2211
add bx,[1] bx=6622+2211 = 8833
add cx,[2] cx=2211+6622 = 8833

执行后的值寄存器ax,bx,cx的值

| 10000H | 23 |
| 10001H | 11 |
| 10002H | 22 |
| 10003H | 11 |

mov ax,1000H ax=1000
mov ds,ax ds=1000
mov ax,11316 ax=2C34
mov [0],ax
mov bx,[0] bx=2C34
sub bx,[2] bx=2C34-1122 = 1B12
mov [2],bx

将10000H~1000FH这段空间当作栈,初始状态为空,将AX BX DS的数据入栈
mov ax,1000
mov ss,ax
mov sp,0010(000F + 1)
push ax
push bx
push ds

将10000H~1000FH这段空间当作栈,初始状态为空,设置ax=001AH,bx=001BH
将ax,bx入栈,然后将ax,bx清零
从栈中恢复ax,bx
mov ax,1000
mov ss,ax
mov sp,0010
mov ax,001A
mov BX,001B
push ax
push bx
sub ax,ax
sub bx,bx
pop bx
pop ax
当前栈顶试bx,要交换ax,bx可以pop ax pop bx

kafka集群版本:2.6.0

发生的原因

默认kafka集群节点配置(config/server.properties)中包含如下内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092

# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://host_name:9092

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

可以发现,默认广播给producers和consumers的节点host是通过java.net.InetAddress.getCanonicalHostName()获取的,这个方法获取到的数据是centos设置的hostname
lua-resty-kafka运行在nginx中,nginx不能解析本地的hostname,导致no resolver defined to resolve问题。

阅读全文 »

分布式锁可以避免不同节点重复相同的工作,也可以进行分布式节点的任务进行协调。
实现分布式锁的方式有很多,zookeeper的实现也比较简单

1
2
3
4
5
6
7
8
9
10
11
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
<version>3.5.8</version>
</dependency>

将zookeeper配置外置

1
2
3
4
5
6
7
8
9
10
11
@ConfigurationProperties(prefix = ZookeeperProperties.PREFIXX)
public class ZookeeperProperties {
static final String PREFIXX = "zoo";

private List<String> nodes;

private int sessionTimeout = 5000;

...getter
...setter
}

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
@Slf4j
public class ZookeeperSession implements InitializingBean, DisposableBean {
private final String nodes;
private final int sessionTimeout;
private ZooKeeper zookeeper;
private final CountDownLatch connectedSemaphore = new CountDownLatch(1);

public ZookeeperSession(ZookeeperProperties properties) {
this.sessionTimeout = properties.getSessionTimeout();
this.nodes = StringUtils.join(properties.getNodes(), ",");
}

/**
* 获取锁
*
* @param productId 商品标识
*/
public void acquireDistributedLock(Long productId) {
String path = getPath(productId);
byte[] data = "".getBytes();
//尝试创建锁
int retry = 1;
while (true) {
try {
this.zookeeper.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
log.info("获取锁成功,尝试{}次", retry);
return;
} catch (Exception exception) {
try {
TimeUnit.MILLISECONDS.sleep(20);
} catch (InterruptedException e) {
log.warn("interrupted error");
}
retry++;
}
}
}

/**
* 释放锁
*
* @param productId
*/
public void releaseDistributedLock(Long productId) {
try {
this.zookeeper.delete(getPath(productId), -1);
log.debug("release distributedLock ,{}", productId);
} catch (Exception e) {
log.error("release distributedLock error", e);
}
}

private String getPath(Long productId) {
return "/product-lock-" + productId;
}


@Override
public void afterPropertiesSet() throws Exception {
try {
this.zookeeper = new ZooKeeper(this.nodes, this.sessionTimeout, watchedEvent -> {
Watcher.Event.KeeperState state = watchedEvent.getState();
log.debug("zookeeper state:[{}]", state);
if (state == Watcher.Event.KeeperState.SyncConnected) {
log.info("zookeeper connected");
this.connectedSemaphore.countDown();
}
});
} catch (IOException e) {
log.error("zookeeper connect error", e);
}
this.connectedSemaphore.await();
}

@Override
public void destroy() throws Exception {
this.zookeeper.close();
}
}

注册为spring bean

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Configuration
@EnableConfigurationProperties(ZookeeperProperties.class)
public class ZookeeperSessionConf {

private final ZookeeperProperties properties;

public ZookeeperSessionConf(ZookeeperProperties properties) {
this.properties = properties;
}

@Bean
ZookeeperSession zookeeperSession() {
return new ZookeeperSession(properties);
}
}
1
2
3
4
5
zoo:
nodes:
- 192.168.0.201:2181
- 192.168.0.202:2181
- 192.168.0.203:2181

注入ZookeeperSession进行锁定和释放

why cluster

redis replication实现一主多从架构可以实现读写分离
redis sentinel实现一主多从下得高可用
但是却不能突破redis单机瓶颈

阅读全文 »