天天看点

laravel中如何使用消息队列mac安装kafka安装kafka的php扩展producerconsumer

queue配置

首先说明一下我之前的项目中如何使用queue的。

我们现在的项目都是用的symfony,老一点的项目用的symfony1.4,新一点的项目用的都是symfony2。symfony用起来整体感觉还是很爽的,尤其symfony2,整体上来讲使用了很多java里面框架的设计思想。但是他不支持queue。在symfony,我们使用queue也经历了几个过程。最开始使用张堰同学的httpsqs。这个简单使用,但是存在单点。毕竟我们的项目还是正式对外服务的,所以我们研究了Apache旗下的开源项目ActiveMQ,研究研究发现还有Apache旗下还有更新的MQ,那就是Apollo。最后我们决定使用的Apollo。

queue在我们的项目中主要的应用场景就是异步处理一些比较耗时的功能,比如同步第三方数据、数据有变动了同步通知到我们的第三方数据使用者等等。我们大致的思路是这样的,在各个controller里面如果需要异步处理的,就把一个json对象encode一下,塞到Apollo里面。再写一个work的Command,在这个Command中解析json对象,根据里面的action和参数决定来调用不同的方法处理。根据业务需要同时在不同的机器上运行Command作为守护进程一直跑着,也算实现异步多任务处理应用的方案。就这么一直使用着,直到发现了laravel。打算研究一下。如果可能替代一下也不是不可能。呵呵。

由于才开始学习,当然直接上laravel5。routes、controller、view都基本上和symfony差别不到,上手倒是不困难。最后研究一下queue。

1、安装laravle,使用composer,倒是很简单。

?

1 2

composer global require

"laravel/installer=~1.1"

vi

~/.bash_profile

把~/.composer/vendor/bin 加入到环境变量中。

?

1

source

~/.bash_profile

就可以直接在命令行中使用laravel了。试一下。

?

1

laravel -V

能够看到下面的,就代表成功了。

?

1

Laravel Installer version 1.2.1

2、创建项目。

?

1

laravel new guagua

3、配置redis和queue。

4、创建controller,

?

1

php artisan

make

:controller DefaultController

在controller的action中push100个queue的任务。

?

1 2 3

for

(

$i

= 0;

$i

< 100;

$i

++) {

Queue::push(

new

SendEmail(

"ssss"

.

$i

));

}

5、创建queue的Command

?

1

php artisan

make

:

command

SendEmail --queued

修改app/Commands/SendEmail.php,添加一个私有变量。

?

1

protected

$msg

;

同时修改构造函数。

?

1 2 3 4

public

function

__construct(

$msg

)

{

$this

->msg =

$msg

;

}

再修改的handle方法

?

1 2 3 4 5

public

function

handle() {

sleep(4);

echo

$this

->msg.

"\t"

.

date

(

"Y-m-d H:i:s"

).

"\n"

;

$this

->

delete

();

}

6、修改routes

?

1 2 3 4

Route::get(

'/'

, [

'as'

=>

'index'

,

'uses'

=>

'[email protected]'

]);

7、监听queue

?

1

php artisan queue:listen

为了验证多任务处理,我们同时开三个窗口运行同样的命令。

8、用laravel内建的server启动服务

?

1

php artisan serve --port 8080

打开浏览器,访问http://localhost:8080/页面。当然也可以用nginx,apache之类的。但是需要各种配置,还是内建的使用方便。

在控制台就能看到各个queue执行的情况了,如下图。可以看到100个任务被三个work平分了。

laravel中如何使用消息队列mac安装kafka安装kafka的php扩展producerconsumer

到此,基本达到了我想要的效果。验证了laravel可以简单实现queue,并且可以多任务处理。

make command生成的代码中use App\Commands\Command ,但是运行时提示没有这个文件。 解决办法,修改为 use Illuminate\Console\Command; 不知道为什么会出现这个低级问题,难道是我mac系统问题,还是我的人品问题。

在controller的action中push队列的时候,没有异步执行,还是在action的脚本中执行的。 发现是配置问题,原来不仅仅要修改config中的queue.php,还要修改.evn中相关配置。 虽然问题解决了,但是还是觉得蛋疼,不能理解。还需要在学习学习laravel。

异步队列使用方法

1.配置

关于队列的定义,这里就不作介绍了。我们要使用异步队列就有两个关键:

(1)存储队列的地方

(2)执行任务的服务

打开 config/queue.php ,这是Laravel5关于队列的配置文件。首先我们可以通过 default 参数指定默认队列驱动,默认配置是 sync , 这是同步队列,我们要做异步队列首先就要改变这里。假设我们用 database 作为驱动,队列任务将会存放在数据库中,而我们后面会另外启动一个后台服务来处理队列任务,这就是异步方式了。

?

1

'default'

=>

'database'

修改完配置后,我们需要创建一个表来存放队列任务,Laravel5已经在自带artisan命令中内置了一个指令用来生成数据迁移,只需要两条命令即可,当然你得实现配置好数据库连接。

?

1 2

php artisan queue:table

php artisan migrate

这样就自动在数据库中创建了 jobs 表。

2.启动队列监听服务

通过下面这条指令启动队列监听服务,它会自动处理 jobs 表中的队列任务:

?

1

php artisan queue:listen

在linux中,如果想让它在后台执行,可以这样:

?

1

nohup

php artisan queue:listen &

3.添加队列任务

关于队列任务的添加,手册里说的比较详细,这里就简单举个例子吧。

首先,通过artisan创建一个队列命令:

?

1

php artisan

make

:

command

SendEmail --queued

这样会生成 app/Commands/SendEmail.php 这个类文件,这个类会被标识为队列命令,你可以在 handle 方法中写自己的业务逻辑。

在控制器中,可以简单通过 Bus::dispatch 分发任务:

?

1

Bus::dispatch(

new

\App\Commands\SendEmail());

你会发现任务不会立即执行,而是被放到 jobs 表中,由队列监听服务处理。

更详细的用法建议参考 command bus 和 queue 相关的手册章节。

在之前项目进行采集数据时,采用kafka消息队列,也挺不错的,接下来我们来搭建kafka消息队列:

mac安装kafka

1.安装最新版的kafka

brew install kafka
           

这将安装所有的依赖,包括zookeeper

2.启动zookeeper

brew services start zookeeper //启动zookeeper
zkServer start //或者这样启动
           

可以用 

brew info zookeeper

 命令查看zookeeper的相关信息,包括启动命令

3.启动kafka

brew services start kafka //启动kafka
kafka-server-start /usr/local/etc/kafka/server.properties //或者这样启动
           

同样可以用 

brew info kafka

 命令查看kafka的相关信息,包括启动命令

4. 创建一个topic

创建了一个名字为

test

topic

topic

的名字最好是全e文,不要有 

_ .

等特殊符号,可以用以下命令查看创建的 

topic

/usr/local/bin/kafka-topics --list --zookeeper localhost: //查看topic
/usr/local/bin/kafka-topics --delete --zookeeper localhost: --topic entere //删除名为entere的topic
           

5. 发送消息 producer

/usr/local/bin/kafka-console-producer --broker-list localhost: --topic test

hello this is a test message
           

6. 消费消息 consumer

/usr/local/bin/kafka-console-consumer --zookeeper localhost: --topic test --from-beginning
           

7. 配置多个broker集群

到目前为止,我们都是在单个broker上运行的,但是这没啥好玩的。对于Kafka来说,单个broker其实就是一个大小为1的集群,所以对于启动多个broker的实例来说,道理也是一样的,并没有太多变化。但是为了感觉一下他,就让我们将我们的集群扩充道3个节点(仍然全部运行在我们的本地机器上)。 首先我们为每一个broker建一个配置文件:

cp config/server.properties config/server-.properties 
cp config/server.properties config/server-.properties
           

现在,编辑这些新文件,并设置以下属性:

config/server-.properties:
    broker.id=
    port=
    log.dir=/tmp/kafka-logs-

config/server-.properties:
    broker.id=
    port=
    log.dir=/tmp/kafka-logs-
           

其中broker.id属性是一个不重复的常量,用来表示集群中每个节点的名字。我们在这里不得不重写port和log.dir,这只是因为我们是在同一台机器上运行这些命令,而我们要防止多个borker使用同一个端口注册而覆盖彼此的内容。

我们已经有了Zookeeper并且我们的单节点已经启动,所以我们现在需要启动这两个新节点:

/usr/local/bin/kafka-server-start config/server-.properties &
/usr/local/bin/kafka-server-start config/server-.properties &
           

现在创建一个有三个备份因子的新topic:

/usr/local/bin/kafka-topics --create --zookeeper localhost: --replication-factor  --partitions  --topic my-replicated-topic
           

好了,现在我们有一个集群了,但是我们怎么知道每个个broker都在做什么呢?让我们运行

describe topics

命令来看看:

/usr/local/bin/kafka-topics --describe --zookeeper localhost: --topic my-replicated-topic


Topic:my-replicated-topic    PartitionCount:    ReplicationFactor: Configs:

Topic: my-replicated-topic  Partition:     Leader:    Replicas: ,, Isr: ,,
           

这是上面输出的说明。第一行给出了所有分区的总结,此外每一行都是一个分区的信息。因为我们现在在这个topic上只有两个分区,所以就只有两行。

"leader" 负责给定分区中所有的读和写的任务。分区将随即选取一个节点作为leader。

“replicas” 列出了所有当前分区中的副本节点。不论这些节点是否是leader或者是否处于激活状态,都会被列出来。

“isr” 是表示“在同步中”的副本节点的列表。是replicas列表的一个子集,包含了当前处于激活状态的节点,并且leader节点开头。

注意在我们的例子中,节点1该topic仅有的一个分区中的leader节点。

我们可以在之前我们创建的topic中运行同样的命令,来看看是什么情况:

/usr/local/bin/kafka-topics --describe --zookeeper localhost: --topic test


Topic:test    PartitionCount:    ReplicationFactor: Configs:

Topic: test Partition:     Leader:    Replicas:  Isr: 
           

看,和猜测的一样 -- 在之前的topic下没有副本节点,且其运行在server 0上,它是我们在创建topic时在集群中创建的唯一一个server。

让我们向我们的新topic发布一些消息:

/usr/local/bin/kafka-console-producer --broker-list localhost: --topic my-replicated-topic

my test message 
my test message 
           

现在让我们消费这些消息:

/usr/local/bin/kafka-console-consumer --zookeeper localhost: --from-beginning --topic my-replicated-topic

my test message 
my test message 
           

现在让我们测试一下容错性。Broker 1是其中的leader,让我们关了它:

ps | grep server-.properties

 ttys002    : /System/Library/Frameworks/JavaVM.framework/Versions//Home/bin/java...

kill - 
           

Leader节点转移了,并且1号节点不再存在于“正在同步”的副本集合内:

/usr/local/bin/kafka-topics --describe --zookeeper localhost: --topic my-replicated-topic

Topic:my-replicated-topic    PartitionCount:    ReplicationFactor: Configs:

Topic: my-replicated-topic  Partition:     Leader:    Replicas: ,, Isr: ,
           

但是这些消息仍然可以用来消费,即便是原本负责写的leader节点被关掉了:

bin/kafka-console-consumer --zookeeper localhost: --from-beginning --topic my-replicated-topic

my test message 
my test message 
           

安装kafka的php扩展

brew install homebrew/php/php70-rdkafka
           

我们这里选取了 php70-rdkafka 这个扩展,安装后重启php-fpm,大功告

producer

php发送消息示例

<?php
    public function handle() 
    {
        $title = "传唤华为,没有赢家的阻击战";
        $content = "美国政府继3月份对中兴下重手处罚之后,开始瞄准华为。";
                    //$host_list = "172.16.88.12:9092";
        //$host_list = "172.16.88.11:2181/kafka/q-ksg2na7l";
        $broker = "172.16.88.12:9092";
        //$broker = "localhost:9092";
        $kafka = new \RdKafka\Producer();
        $kafka->setLogLevel(LOG_DEBUG);
        $num = $kafka->addBrokers($broker);
        echo "added $num brokers \r\n";
        $topic = $kafka->newTopic("topic_article_publish");
        for($i = ; $i < ; $i++){
            $msg = [
                'header'=>[
                    'type'=>'article_publish',
                    'time'=>time(),
                ],
                'body'=>[
                    'title'=>$i.'--'.$title,
                    'content'=>$i.'__'.$content,
                ],

            ];
            $topic->produce(RD_KAFKA_PARTITION_UA, , json_encode($msg,JSON_UNESCAPED_UNICODE));
            echo "the $i message sended successfully \r\n";
        }


    }
?>
           

consumer

php接收消息示例

<?php
    public function handle()
    {
        $broker = "172.16.88.12:9092";
        //$broker = "localhost:9092";
        $rk = new \RdKafka\Consumer();
        $rk->setLogLevel(LOG_DEBUG);
        $num = $rk->addBrokers($broker);
        $topic = $rk->newTopic("topic_article_publish");
        $topic->consumeStart(, RD_KAFKA_OFFSET_END);
        //RD_KAFKA_OFFSET_BEGINNING,从partition消息队列的开始进行consume;
        //RD_KAFKA_OFFSET_END,从partition中的将要produce的下一条信息开始(忽略即当前所有的消息)
        //rd_kafka_offset_tail(5),consume 5 messages from the end,取最新5条
        while (true) {
            $msg = $topic->consume(, );
            if(null === $msg){

            } else {
                if ($msg->err) {
                    echo $msg->errstr(), "\n";
                    sleep();

                } else {
                    //var_dump($msg);
                    echo $msg->payload, "\n";
                    echo $msg->offset,"\n";
                }
            }
        }

    }
?>