💎一站式轻松地调用各大LLM模型接口,支持GPT4、智谱、星火、月之暗面及文生图 广告
有时候你可能想使用不是基于JVM的语言开发一个Storm工程,你可能更喜欢使用别的语言或者想使用用某种语言编写的库。 Storm是用Java实现的,你看到的所有这本书中的*spout*和*bolt*都是用java编写的。那么有可能使用像Python、Ruby、或者JavaScript这样的语言编写*spout*和*bolt*吗?答案是当然 可以!可以使用*多语言协议*达到这一目的。 多语言协议是Storm实现的一种特殊的协议,它使用标准输入输出作为*spout*和*bolt*进程间的通讯通道。消息以JSON格式或纯文本格式在通道中传递。 我们看一个用非JVM语言开发*spout*和*bolt*的简单例子。在这个例子中有一个*spout*产生从1到10,000的数字,一个*bolt*过滤素数,二者都用PHP实现。 **NOTE: **在这个例子中,我们使用一个很笨的办法验证素数。有更好当然也更复杂的方法,它们已经超出了这个例子的范围。 有一个专门为Storm实现的PHP DSL(译者注:领域特定语言),我们将会在例子中展示我们的实现。首先定义拓扑。 ~~~ ... TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("numbers-generator", new NumberGeneratorSpout(1, 10000)); builder.setBolt("prime-numbers-filter", new PrimeNumbersFilterBolt()).shuffleGrouping("numbers-generator"); StormTopology topology = builder.createTopology(); ... ~~~ **NOTE:**有一种使用非JVM语言定义拓扑的方式。既然Storm拓扑是Thrift架构,而且*Nimbus*是一个Thrift守护进程,你就可以使用任何你想用的语言创建并提交拓扑。但是这已经超出了本书的范畴了。 这里没什么新鲜了。我们看一下**NumbersGeneratorSpout**的实现。 ~~~ public class NumberGeneratorSpout extends ShellSpout implements IRichSpout { public NumberGeneratorSpout(Integer from, Integer to) { super("php", "-f", "NumberGeneratorSpout.php", from.toString(), to.toString()); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("number")); } public Map<String, Object> getComponentConfiguration() { return null; } } ~~~ 你可能已经注意到了,这个*spout*继承了**ShellSpout**。这是个由Storm提供的特殊的类,用来帮助你运行并控制用其它语言编写的*spout*。在这种情况下它告诉Storm如何执行你的PHP脚本。 NumberGeneratorSpout的PHP脚本向标准输出分发元组,并从标准输入读取确认或失败信号。 在开始实现NumberGeneratorSpout.php脚本之前,多观察一下多语言协议是如何工作的。 *spout*按照传递给构造器的参数从**from**到**to**顺序生成数字。 接下来看看**PrimeNumbersFilterBolt**。这个类实现了之前提到的壳。它告诉Storm如何执行你的PHP脚本。Storm为这一目的提供了一个特殊的叫做**ShellBolt**的类,你惟一要做的事就是指出如何运行脚本以及声明要分发的属性。 ~~~ public class PrimeNumbersFilterBolt extends ShellBolt implements IRichBolt { public PrimeNumbersFilterBolt() { super("php", "-f", "PrimeNumbersFilterBolt.php"); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("number")); } } ~~~ 在这个构造器中只是告诉Storm如何运行PHP脚本。它与下列命令等价。 ~~~ php -f PrimeNumbersFilterBolt.php ~~~ PrimeNumbersFilterBolt.php脚本从标准输入读取元组,处理它们,然后向标准输出分发、确认或失败。在开始这个脚本之前,我们先多了解一些多语言协议的工作方式。 1. 发起一次握手 2. 开始循环 3. 读/写元组 **NOTE:**有一种特殊的方式可以使用Storm的内建日志机制在你的脚本中记录日志,所以你不需要自己实现日志系统。 下面我们来看一看上述每一步的细节,以及如何用PHP实现它。 **发起握手** 为了控制整个流程(开始以及结束它),Storm需要知道它执行的脚本进程号(PID)。根据多语言协议,你的进程开始时发生的第一件事就是Storm要向标准输入(译者注:根据上下文理解,本章提到的标准输入输出都是从非JVM语言的角度理解的,这里提到的标准输入也就是PHP的标准输入)发送一段JSON数据,它包含Storm配置、拓扑上下文和一个进程号目录。它看起来就像下面的样子: ~~~ { "conf": { "topology.message.timeout.secs": 3, // etc }, "context": { "task->component": { "1": "example-spout", "2": "__acker", "3": "example-bolt" }, "taskid": 3 }, "pidDir": "..." } ~~~ 脚本进程必须在**pidDir**指定的目录下以自己的进程号为名字创建一个文件,并以JSON格式把进程号写到标准输出。 ~~~ {"pid": 1234} ~~~ 举个例子,如果你收到**/tmp/example\n**而你的脚本进程号是123,你应该创建一个名为**/tmp/example/123**的空文件并向标准输出打印文本行 **{“pid”: 123}\n**(译者注:此处原文只有一个n,译者猜测应是排版错误)和**end\n**。这样Storm就能持续追踪进程号并在它关闭时杀死脚本进程。下面是PHP实现: ~~~ $config = json_decode(read_msg(), true); $heartbeatdir = $config['pidDir']; $pid = getmypid(); fclose(fopen("$heartbeatdir/$pid", "w")); storm_send(["pid"=>$pid]); flush(); ~~~ 你已经实现了一个叫做**read_msg**的函数,用来处理从标准输入读取的消息。按照多语言协议的声明,消息可以是单行或多行JSON文本。一条消息以**end\n**结束。 ~~~ function read_msg() { $msg = ""; while(true) { $l = fgets(STDIN); $line = substr($l,0,-1); if($line=="end") { break; } $msg = "$msg$line\n"; } return substr($msg, 0, -1); } function storm_send($json) { write_line(json_encode($json)); write_line("end"); } function write_line($line) { echo("$line\n"); } ~~~ **NOTE:**flush()方法非常重要;有可能字符缓冲只有在积累到一定程度时才会清空。这意味着你的脚本可能会为了等待一个来自Storm的输入而永远挂起,而Storm却在等待来自你的脚本的输出。因此当你的脚本有内容输出时立即清空缓冲是很重要的。 **开始循环以及读/写元组** 这是整个工作中最重要的一步。这一步的实现取决于你开发的*spout*和*bolt*。 如果是*spout*,你应当开始分发元组。如果是*bolt*,就循环读取元组,处理它们,分发它发,确认成功或失败。 下面我们就看看用来分发数字的*spout*。 ~~~ $from = intval($argv[1]); $to = intval($argv[2]); while(true) { $msg = read_msg(); $cmd = json_decode($msg, true); if ($cmd['command']=='next') { if ($from<$to) { storm_emit(array("$from")); $task_ids = read_msg(); $from++; } else { sleep(1); } } storm_sync(); } ~~~ 从命令行获取参数**from**和**to**,并开始迭代。每次从Storm得到一条**next**消息,这意味着你已准备好分发下一个元组。 一旦你发送了所有的数字,而且没有更多元组可发了,就休眠一段时间。 为了确保脚本已准备好发送下一个元组,Storm会在发送下一条之前等待**sync\n**文本行。调用**read_msg()**,读取一条命令,解析JSON。 对于*bolts*来说,有少许不同。 ~~~ while(true) { $msg = read_msg(); $tuple = json_decode($msg, true, 512, JSON_BIGINT_AS_STRING); if (!empty($tuple["id"])) { if (isPrime($tuple["tuple"][0])) { storm_emit(array($tuple["tuple"][0])); } storm_ack($tuple["id"]); } } ~~~ 循环的从标准输入读取元组。解析读取每一条JSON消息,判断它是不是一个元组,如果是,再检查它是不是一个素数,如果是素数再次分发一个元组,否则就忽略掉,最后不论如何都要确认成功。 **NOTE:**在**json_decode**函数中使用的**JSON_BIGINT_AS_STRING**是为了解决一个在JAVA和PHP之间的数据转换问题。JAVA发送的一些很大的数字,在PHP中会丢失精度,这样就会导致问题。为了避开这个问题,告诉PHP把大数字当作字符串处理,并在JSON消息中输出数字时不使用双引号。PHP5.4.0或更高版本要求使用这个参数。 **emit,ack,fail,**以及**log**消息都是如下结构: **emit** ~~~ { "command": "emit", "tuple": ["foo", "bar"] } ~~~ 其中的数组包含了你分发的元组数据。 **ack** ~~~ { "command": "ack", "id": 123456789 } ~~~ 其中的id就是你处理的元组的ID。 **fail** ~~~ { "command": "fail", "id": 123456789 } ~~~ 与**ack**(译者注:原文是**emit**从上下JSON的内容和每个方法的功能上判断此处就是**ack**,可能是排版错误)相同,其中id就是你处理的元组ID。 **log ** ~~~ { "command": "log", "msg": "some message to be logged by storm." } ~~~ 下面是完整的的PHP代码。 ~~~ //你的spout: <?php function read_msg() { $msg = ""; while(true) { $l = fgets(STDIN); $line = substr($l,0,-1); if ($line=="end") { break; } $msg = "$msg$line\n"; } return substr($msg, 0, -1); } function write_line($line) { echo("$line\n"); } function storm_emit($tuple) { $msg = array("command" => "emit", "tuple" => $tuple); storm_send($msg); } function storm_send($json) { write_line(json_encode($json)); write_line("end"); } function storm_sync() { storm_send(array("command" => "sync")); } function storm_log($msg) { $msg = array("command" => "log", "msg" => $msg); storm_send($msg); flush(); } $config = json_decode(read_msg(), true); $heartbeatdir = $config['pidDir']; $pid = getmypid(); fclose(fopen("$heartbeatdir/$pid", "w")); storm_send(["pid"=>$pid]); flush(); $from = intval($argv[1]); $to = intval($argv[2]); while(true) { $msg = read_msg(); $cmd = json_decode($msg, true); if ($cmd['command']=='next') { if ($from<$to) { storm_emit(array("$from")); $task_ids = read_msg(); $from++; } else { sleep(1); } } storm_sync(); } ?> //你的bolt: <?php function isPrime($number) { if ($number < 2) { return false; } if ($number==2) { return true; } for ($i=2; $i<=$number-1; $i++) { if ($number % $i == 0) { return false; } } return true; } function read_msg() { $msg = ""; while(true) { $l = fgets(STDIN); $line = substr($l,0,-1); if ($line=="end") { break; } $msg = "$msg$line\n"; } return substr($msg, 0, -1); } function write_line($line) { echo("$line\n"); } function storm_emit($tuple) { $msg = array("command" => "emit", "tuple" => $tuple); storm_send($msg); } function storm_send($json) { write_line(json_encode($json)); write_line("end"); } function storm_ack($id) { storm_send(["command"=>"ack", "id"=>"$id"]); } function storm_log($msg) { $msg = array("command" => "log", "msg" => "$msg"); storm_send($msg); } $config = json_decode(read_msg(), true); $heartbeatdir = $config['pidDir']; $pid = getmypid(); fclose(fopen("$heartbeatdir/$pid", "w")); storm_send(["pid"=>$pid]); flush(); while(true) { $msg = read_msg(); $tuple = json_decode($msg, true, 512, JSON_BIGINT_AS_STRING); if (!empty($tuple["id"])) { if (isPrime($tuple["tuple"][0])) { storm_emit(array($tuple["tuple"][0])); } storm_ack($tuple["id"]); } } ?> ~~~ **NOTE:**需要重点指出的是,应当把所有的脚本文件保存在你的工程目录下的一个名为**multilang/resources**的子目录中。这个子目录被包含在发送给工人进程的jar文件中。如果你不把脚本包含在这个目录中,Storm就不能运行它们,并抛出一个错误。 **原创文章,转载请注明:** 转载自[并发编程网 – ifeve.com](http://ifeve.com/) **本文链接地址:** [Storm入门之第7章使用非JVM语言开发](http://ifeve.com/getting-started-with-storm7/)