| WebSphere-MQTT-Client documentation | Contained in the WebSphere-MQTT-Client distribution. |
WebSphere::MQTT::Persist::File - filesystem persistence object for MQTT
use WebSphere::MQTT::Client;
use WebSphere::MQTT::Persist::File;
my $mqtt = WebSphere::MQTT::Client->new(
Hostname => 'localhost',
Persist => WebSphere::MQTT::Persist::File->new('/tmp/wmqtt'),
Async = 1,
);
$mqtt->connect();
$mqtt->publish("mydata", "mytopic", 1); # QOS 1/2 data is persisted
WebSphere::MQTT::Persist::File
This is a Perl implementation of a persistence object for MQTT
For details of the API, see doc/ia93.pdf, Chapter 3, "WMQTT Persistence Interface"
WARNING: THIS IS NOT IBM CODE AND HAS NOT BEEN HEAVILY TESTED. USE AT YOUR OWN RISK. YOU ARE ADVISED NOT TO ENTRUST CRITICAL DATA TO THIS LAYER!
Please report any bugs or feature requests to
bug-websphere-mqtt-client@rt.cpan.org, or through the web interface at
http://rt.cpan.org.
Brian Candler, B.Candler@pobox.com
Copyright (C) 2006 Brian Candler
This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself, either Perl version 5.005 or, at your option, any later version of Perl 5 you may have available.
| WebSphere-MQTT-Client documentation | Contained in the WebSphere-MQTT-Client distribution. |
package WebSphere::MQTT::Persist::File; ################ # # MQTT: WebSphere MQ Telemetry Transport persistence object # # Re-written in Perl since the IBM C source (mspfp.c) is not in # the list of redistributable files. # # Brian Candler # B.Candler@pobox.com # use strict; use Carp; use Fcntl qw(SEEK_END SEEK_SET); sub new { my $class = shift; my $basedir = shift; $basedir = '/tmp/wmqtt' unless defined $basedir; # Store parameters my $self = { 'basedir' => $basedir, }; # Bless the hash into an object bless $self, $class; return $self; } sub open { my ($self, $clientid, $broker, $port) = @_; $self->{'sentdir'} = sprintf("%s/%s/%s_%s/sent", $self->{'basedir'}, $clientid, $broker, $port); $self->{'rcvddir'} = sprintf("%s/%s/%s_%s/rcvd", $self->{'basedir'}, $clientid, $broker, $port); mkdir_p($self->{'sentdir'}, 0740); mkdir_p($self->{'rcvddir'}, 0740); 0; } sub mkdir_p { my ($name, $mode) = @_; $mode = 0777 unless defined $mode; my @dirs = split('/', $name); my $path = ''; while(@dirs) { $path .= shift(@dirs).'/'; next if $path eq '/'; next if mkdir($path, $mode) || $!{EEXIST} || $!{EISDIR}; croak("mkdir_p: $!"); } } sub close { 0; } # Called if clean_start=1 on connect. Delete all files in # the sent and rcvd directories sub reset { my $self = shift; forallfiles($self->{'sentdir'}, sub { unlink $_[0] }); forallfiles($self->{'rcvddir'}, sub { unlink $_[0] }); 0; } # Run the passed function for all files in a directory # (can you tell I prefer Ruby? :-) sub forallfiles { my $dir = shift; my $callback = shift; opendir(DIR,$dir) || croak("opendir $dir: $!"); while (my $f = readdir(DIR)) { next if ($f eq '.' || $f eq '..'); $callback->("$dir/$f"); } closedir(DIR); } # Called if clean_start=0 on connect. Load all received messages found on # disk into an array of [id,message,id,message,...] sub getAllReceivedMessages { my $self = shift; my @res = (); forallfiles($self->{'rcvddir'}, sub { BLOCK: { last BLOCK unless $_[0] =~ /\/(\d+)$/; my $key = $1; unless (CORE::open(F, '<', $_[0])) { warn("open $_[0]: $!\n"); last BLOCK; } my @stat = stat(F); unless (@stat) { warn("stat $_[0]: $!\n"); CORE::close(F); last BLOCK; } my $len = $stat[7]; my $data = ""; if (sysread(F, $data, $len) != $len) { warn("sysread $_[0]: $!\n"); CORE::close(F); last BLOCK; } push @res, $key, $data; CORE::close(F); }}); @res; } # Called if clean_start=0 on connect. Load all received messages found on # disk into an array of [id,message,id,message,...]. Note that the library # may not actually attempt to send these messages until 'retry_interval' # seconds have passed. # # When recovering sent messages we need to ensure that we only restore the # latest message associated with a particular key. When updSentMessage is # called to replace a PUBLISH with a PUBREL there is a small overlap where # both files (NNN and NNNu) are present. Failure at this point would result # in both messages be available for recovery, which would result in # duplication. So if both a PUBLISH and PUBREL are found for the same key, # we need to ensure that only the PUBREL is recovered. sub getAllSentMessages { my $self = shift; my @res = (); my %seen = (); forallfiles($self->{'sentdir'}, sub { BLOCK: { last BLOCK unless $_[0] =~ /\/(\d+)(u?)$/; my ($key, $u) = ($1, $2); # This is NNN and we've already seen NNNu? Ignore it last BLOCK if ($u eq '' && $seen{$key}); # Read in the file unless (CORE::open(F, '<', $_[0])) { warn("open $_[0]: $!"); last BLOCK; } my @stat = stat(F); unless (@stat) { warn("stat $_[0]: $!\n"); CORE::close(F); last BLOCK; } my $len = $stat[7]; my $data = ""; if (sysread(F, $data, $len) != $len) { warn("sysread $_[0]: $!\n"); CORE::close(F); last BLOCK; } CORE::close(F); # This is NNNu and we've already seen NNN? Replace it if ($u eq 'u' && $seen{$key}) { $res[$seen{$key}] = $data; last BLOCK; } push @res, $key, $data; $seen{$key} = $#res; }}); @res; } # The actual writing of a message file. We take a bit of care here # to ensure only whole messages are left in the filesystem sub addMessage { my ($self, $key, $data, $dir, $suffix) = @_; my $tmp = "$dir/tmp$key${suffix}_$$"; CORE::open(F,'>',$tmp) || return 1; binmode(F); goto FAIL unless syswrite(F, $data) == length($data); goto FAIL2 unless CORE::close(F); goto FAIL2 unless rename($tmp,"$dir/$key$suffix"); return 0; FAIL: CORE::close(F); FAIL2: unlink($tmp); return 1; } sub addSentMessage { my ($self, $key, $data) = @_; $self->addMessage($key,$data,$self->{'sentdir'}, ''); } sub updSentMessage { my ($self, $key, $data) = @_; my $rc = $self->addMessage($key,$data,$self->{'sentdir'}, 'u'); unlink("$self->{'sentdir'}/$key") if $rc == 0; $rc; } sub delSentMessage { my $self = shift; my $key = shift; return 1 if (unlink("$self->{'sentdir'}/$key") == 0 && ! $!{ENOENT}); return 1 if (unlink("$self->{'sentdir'}/${key}u") == 0 && ! $!{ENOENT}); 0; } sub addReceivedMessage { my ($self, $key, $data) = @_; $self->addMessage($key,$data,$self->{'rcvddir'}, ''); } # This is the weird one. We have to OR the last byte of the file with 0x01 sub updReceivedMessage { my $self = shift; my $key = shift; CORE::open(F,'+<',"$self->{'rcvddir'}/$key") || return 1; binmode(F); my $pos = sysseek(F, -1, Fcntl::SEEK_END); goto FAIL unless defined $pos && $pos >= 0; my $d = ''; goto FAIL unless sysread(F, $d, 1) == 1; $d = chr(ord($d)|0x01); my $pos2 = sysseek(F, $pos, Fcntl::SEEK_SET); goto FAIL unless $pos2 == $pos; goto FAIL unless syswrite(F, $d, 1) == 1; return 1 unless CORE::close(F); return 0; FAIL: CORE::close(F); 1; } sub delReceivedMessage { my $self = shift; my $key = shift; return unlink("$self->{'rcvddir'}/$key") != 1; } 1; __END__