Simple Messaging in PHP

Queuing and messaging are two super-dandy things that I've had plenty of opportunities to play with while at Shutterstock. And with such a diverse company there are plenty of options to choose from. For a while I played with Resque, then it was RabbitMQ, then it was Kafka and Amazon SQS, all the while supporting an ancient MySQL-backed queue system with tons of technical debt mounted around it. When it came time to build a queue for my personal site, though, all of these options seemed over-engineered.

My site needed a simple image processor. It's a three step process: metadata is defined, metadata is injected into the raw photo, and then a few thumbnails are generated. I used to handle the entire process in a single monolithic script that would munch up memory and be a pain to split up. If I wanted to thumbnail a bunch of prepared images I'd have to comment out a ton of code and hope it all works. A better solution would be to split up the three steps into three separate jobs that pass messages between and can operate independently. But again, setting up something like SQS or a Redis server just seemed to big for this problem.

Enter Semaphore. This is a set of functions built into PHP that can reference shared memory between PHP processes. As long as all of the independent scripts are running on the same machine you can reference flagged chunks of System V memory, thus passing messages between separate jobs. Here's an example of a simple sender with plenty of safeguards.

  1. $queue = msg_get_queue(1337);

  2. $message = (object) [

  3. 'id' => uniqid(),

  4. 'data' => 'hello world',

  5. ];

  6. if (msg_send(

  7. $queue,

  8. 1, // msgtype

  9. $message,

  10. true, // session-like serialization

  11. false, // blocking based on available memory

  12. $error_code // returned error code

  13. )) {

  14. echo 'INFO: Added new message to queue, id: ',

  15. $message->id,

  16. PHP_EOL;

  17. } else {

  18. echo 'ERROR: Failed to add to queue, error: ',

  19. $error_code,

  20. PHP_EOL;

  21. }

The message queue is instantiated with a defined integer constant, the same constant that will be used to pull from the queue, so depending on your architecture it may make sense to store this in a bootstrap somewhere. You can throw most types of data into the queue - it will all get serialized safely. Oh, and the blocking is pretty neat. By default the msg_send command will sit and wait until there is enough free memory to store the message. If you set blocking to false than the script will error out and return an appropriate error_code.

Pulling from the queue is about as easy as you'd expect. As long as you get the same defined integer constant to instantiate the message queue the script will just pop off the messages one at a time.

  1. $queue = msg_get_queue(1337);

  2. while (msg_receive(

  3. $queue,

  4. 1,

  5. $msg_type, // set to the msgtype that was defined

  6. 512, // maxsize

  7. $message, // received message

  8. true, // whether or not to un-serialize

  9. MSG_IPC_NOWAIT, // whether or not to wait for a message

  10. $error_code // returned error code

  11. )) {

  12. echo 'INFO: Received a new message, id: ',

  13. $message->id,

  14. PHP_EOL;

  15. }

  16. if (

  17. $error_code &&

  18. $error_code != MSG_ENOMSG

  19. ) {

  20. echo 'ERROR: Received an error: ',

  21. $error_code,

  22. PHP_EOL;

  23. }

  24. echo 'Done running worker.', PHP_EOL;

The flag MSG_IPC_NOWAIT is important, as is the decision to place this code in a while() loop. There are plenty of ways to structure this worker script that is ingesting messages. This script will grab a message, process it (which is only displaying a line of output), and then try to grab the next message. Once there are no more the flag MSG_IPC_NOWAIT will error out the msg_receive() function and escape the loop, skip the error display (the error code is set to MSG_ENOMSG in this case), and stop the script.

Another way to format this is to build a daemon. If you don't pass in that flag then the while loop will run forever until an error is encountered, sitting and waiting for new messages to come in. Or you can skip the loop idea and just write msg_receive() inline with the script, which would only trigger the worker script once as a message comes in and then exit. Any way works depending on how heavy the processing is.

Anyways, that's all you need to do to setup a single-server messaging process. If you run the first script on a server, see no errors, and then run the second script sometime after, you should see the same ID in each output. For my image processes I have a form (that is powered by ReactPHP, it's pretty awesome) that sets up my first job to 'save metadata to image', and that job triggers another five jobs on completion to 'create thumbs'. It's been working great and has really sped up my image processing while keeping everything pleasantly separated.