OpenVeo Publish server

API Docs for: 8.0.0
Show:

File: app/server/PublishManager.js

'use strict';

/**
 * @module publish
 */

var util = require('util');
var events = require('events');
var path = require('path');
var shortid = require('shortid');
var openVeoApi = require('@openveo/api');
var Package = process.requirePublish('app/server/packages/Package.js');
var packageFactory = process.requirePublish('app/server/packages/packageFactory.js');
var ERRORS = process.requirePublish('app/server/packages/errors.js');
var STATES = process.requirePublish('app/server/packages/states.js');
var PublishError = process.requirePublish('app/server/PublishError.js');
var ResourceFilter = openVeoApi.storages.ResourceFilter;

var publishManager;

/**
 * Fired when an error occurred while processing a package.
 *
 * @event error
 * @param {Error} The error
 */

/**
 * Fired when a package process has succeed.
 *
 * @event complete
 * @param {Object} The processed package
 */

/**
 * Fired when a media in error restarts.
 *
 * @event retry
 * @param {Object} The media
 */

/**
 * Fired when a media stuck in "waiting for upload" state starts uploading.
 *
 * @event upload
 * @param {Object} The media
 */

/**
 * Fired when media state has changed.
 *
 * @event stateChanged
 * @param {Object} The media
 */

/**
 * Defines the PublishManager which handles the media publication's process.
 *
 * Media publications are handled in parallel. Media publication's process can be
 * different regarding the type of the media.
 *
 * @example
 *     var coreApi = process.api.getCoreApi();
 *     var database = coreApi.getDatabase();
 *     var PublishManager = process.requirePublish('app/server/PublishManager.js');
 *     var videoProvider = new VideoProvider(database);
 *     var publishManager = new PublishManager(videoProvider, 5);
 *
 *     // Listen publish manager's errors
 *     publishManager.on('error', function(error) {
 *       // Do something
 *     });
 *
 *     // Listen to publish manager's end of processing for a media
 *     publishManager.on('complete', function(mediaPackage){
 *       // Do something
 *     });
 *
 *     // Listen to publish manager's event informing that a media processing is retrying
 *     publishManager.on('retry', function(mediaPackage) {
 *       // Do something
 *     });
 *
 *     // Listen to publish manager's event informing that a media, waiting for upload, starts uploading
 *     publishManager.on('upload', function(mediaPackage) {
 *       // Do something
 *     });
 *
 *     publishManager.publish({
 *       type: 'youtube', // The media platform to use for this media
 *       originalPackagePath: '/home/openveo/medias/media-package.tar', // Path of the media package
 *       originalFileName: 'media-package' // File name without extension
 *     });
 *
 * @class PublishManager
 * @constructor
 * @param {VideoProvider} videoProvider The media provider
 * @param {Number} [maxConcurrentPackage=3] The maximum number of medias to treat in parallel
 */
function PublishManager(videoProvider, maxConcurrentPackage) {
  if (publishManager)
    throw new Error('PublishManager already instanciated, use get method instead');

  Object.defineProperties(this, {

    /**
     * Medias waiting to be processed.
     *
     * @property queue
     * @type Array
     * @final
     */
    queue: {value: []},

    /**
     * Medias being processed.
     *
     * @property pendingPackages
     * @type Array
     * @final
     */
    pendingPackages: {value: []},

    /**
     * Media provider.
     *
     * @property videoProvider
     * @type VideoProvider
     * @final
     */
    videoProvider: {value: videoProvider},

    /**
     * Maximum number of medias to treat in parallel.
     *
     * @property maxConcurrentPackage
     * @type Number
     * @final
     */
    maxConcurrentPackage: {value: maxConcurrentPackage || 3}

  });
}

util.inherits(PublishManager, events.EventEmitter);
module.exports = PublishManager;

/**
 * Removes a media from pending medias.
 *
 * @method removeFromPending
 * @private
 * @param {Object} mediaPackage The media package to remove
 */
function removeFromPending(mediaPackage) {
  for (var i = 0; i < this.pendingPackages.length; i++) {
    if (this.pendingPackages[i]['id'] === mediaPackage.id) {
      this.pendingPackages.splice(i, 1);
      break;
    }
  }
  for (var j = 0; j < this.pendingPackages.length; j++) {
    if (this.pendingPackages[j]['originalFileName'] === mediaPackage.originalFileName) {
      this.pendingPackages.splice(j, 1);
      break;
    }
  }
  process.logger.debug('Package ' + mediaPackage.id + ' from ' +
   mediaPackage.originalFileName + ' is removed from pendingPackages');
}

/**
 * Handles media error event.
 *
 * @method onError
 * @private
 * @param {Error} error The error
 * @param {Object} mediaPackage The media on error
 */
function onError(error, mediaPackage) {

  // Remove media from pending medias
  removeFromPending.call(this, mediaPackage);

  // Publish pending media from FIFO queue
  if (this.queue.length)
    this.publish(this.queue.shift(0));

  // Add media id to the error message
  if (error)
    error.message += ' (' + mediaPackage.id + ')';

  this.emit('error', error, error.code);
}

/**
 * Handles media complete event.
 *
 * @method onComplete
 * @private
 * @param {Object} mediaPackage The package on error
 */
function onComplete(mediaPackage) {

  // Remove package from pending packages
  removeFromPending.call(this, mediaPackage);

  // Publish pending package from FIFO queue
  if (this.queue.length)
    this.publish(this.queue.shift(0));

  this.emit('complete', mediaPackage);
}

/**
 * Creates a media package manager corresponding to the media type.
 *
 * @method createMediaPackageManager
 * @private
 * @param {Object} mediaPackage The media to manage
 * @return {Package} A media package manager
 */
function createMediaPackageManager(mediaPackage) {
  var self = this;
  var mediaPackageManager = packageFactory.get(mediaPackage.packageType, mediaPackage);

  // Handle errors from media package manager
  mediaPackageManager.on('error', function(error) {
    onError.call(self, error, mediaPackage);
  });

  // Handle complete events from media package manager
  mediaPackageManager.on('complete', function(completePackage) {
    onComplete.call(self, completePackage);
  });

  // Handle stateChanged events from media package manager
  mediaPackageManager.on('stateChanged', function(mediaPackage) {
    self.emit('stateChanged', mediaPackage);
  });

  return mediaPackageManager;
}

/**
 * Adds media package to the list of pending packages.
 *
 * @method addPackage
 * @private
 * @param {Object} mediaPackage The media package to add to pending packages
 * @return {Boolean} true if the media package is successfully added to pending packages
 * false if it has been added to queue
 */
function addPackage(mediaPackage) {
  process.logger.debug('Actually ' + this.pendingPackages.length + ' pending packages');
  var idAllreadyPending = this.pendingPackages.filter(function(pendingPackage) {
    return mediaPackage.originalFileName === pendingPackage.originalFileName;
  });

  // Too much pending packages
  if (this.pendingPackages.length >= this.maxConcurrentPackage || idAllreadyPending.length) {

    // Add package to queue
    this.queue.push(mediaPackage);
    process.logger.debug('Add package ' + mediaPackage.originalPackagePath + '(' + mediaPackage.id + ') to queue');
    return false;
  } else {

    // Process can deal with the package
    process.logger.debug('Add package ' + mediaPackage.originalPackagePath +
                      '(' + mediaPackage.id + ') to pending packages');

    // Add package to the list of pending packages
    this.pendingPackages.push(mediaPackage);
    return true;
  }
}

/**
 * Gets an instance of the PublishManager.
 *
 * @method get
 * @static
 * @param {VideoProvider} videoProvider The media provider
 * @param {Number} [maxConcurrentPackage] The maximum number of medias to treat in parallel
 * @return {PublishManager} The PublishManager singleton instance
 */
PublishManager.get = function(videoProvider, maxConcurrentPackage) {
  if (!publishManager)
    publishManager = new PublishManager(videoProvider);

  return publishManager;
};

/**
 * Publishes the given media package.
 *
 * Media package must be of one of the supported type.
 *
 * @method publish
 * @param {Object} mediaPackage Media to publish
 * @param {String} mediaPackage.originalPackagePath Package absolute path
 * @param {String} mediaPackage.packageType The package type
 * @param {String} [mediaPackage.title] The title to use for this media, default to the file name without extension
 */
PublishManager.prototype.publish = function(mediaPackage) {
  var self = this;

  if (mediaPackage && (typeof mediaPackage === 'object')) {

    // Media package can be in queue and already have an id
    if (!mediaPackage.id) {
      var pathDescriptor = path.parse(mediaPackage.originalPackagePath);
      mediaPackage.id = shortid.generate();
      mediaPackage.title = mediaPackage.title || pathDescriptor.name;
    }

    self.videoProvider.getOne(
      new ResourceFilter().equal('originalPackagePath', mediaPackage.originalPackagePath),
      {
        include: ['id']
      },
      function(error, media) {
        if (error) {
          self.emit('error', new PublishError('Getting media with original package path "' +
                                              mediaPackage.originalPackagePath + '" failed with message : ' +
                                              error.message, ERRORS.UNKNOWN));
        } else if (!media) {

          // Package can be added to pending packages as a new one
          if (addPackage.call(self, mediaPackage)) {

            // Media package does not exist
            // Publish it
            var mediaPackageManager = createMediaPackageManager.call(self, mediaPackage);
            mediaPackageManager.init(Package.STATES.PACKAGE_SUBMITTED, Package.TRANSITIONS.INIT);
            mediaPackageManager.executeTransition(Package.TRANSITIONS.INIT);

          }

        } else if (media.id === mediaPackage.id) {

          // Media already exists
          // Retry media
          self.retry(media.id, true);

        } else {
          self.emit(
            'error',
            new PublishError('Duplicate file "' + mediaPackage.originalPackagePath, ERRORS.DUPLICATE_MEDIA)
          );
        }
      }
    );
  } else
    this.emit('error', new PublishError('mediaPackage argument must be an Object', ERRORS.UNKNOWN));
};

/**
 * Retries publishing a media package which is on error.
 *
 * @method retry
 * @param {String} packageId The id of the package on error
 * @param {Boolean} forceRetry Force retrying a package no matter its state
 */
PublishManager.prototype.retry = function(packageId, forceRetry) {
  if (packageId) {
    var self = this;

    // Retrieve package information
    this.videoProvider.getOne(
      new ResourceFilter().equal('id', packageId),
      null,
      function(error, mediaPackage) {
        if (error) {
          self.emit(
            'error',
            new PublishError(
              'Getting package ' + packageId + ' failed with message : ' + error.message,
              ERRORS.UNKNOWN
            )
          );
        } else if (!mediaPackage) {

          // Package does not exist
          self.emit('error', new PublishError('Cannot retry package ' + packageId + ' (not found)',
                                              ERRORS.PACKAGE_NOT_FOUND));

        } else if (mediaPackage.state === STATES.ERROR || forceRetry) {

          // Got package information
          // Package is indeed in error
          self.videoProvider.updateState(mediaPackage.id, STATES.PENDING, function() {

            // Retry officially started
            self.emit('retry', mediaPackage);
            self.emit('stateChanged', mediaPackage);

          });

          var mediaPackageManager = createMediaPackageManager.call(self, mediaPackage);
          process.logger.info('Retry package ' + mediaPackage.id);
          mediaPackageManager.init(mediaPackage.lastState, mediaPackage.lastTransition);

          // Package can be added to pending packages
          if (addPackage.call(self, mediaPackage))
            mediaPackageManager.executeTransition(mediaPackage.lastTransition);
        }

      }
    );
  }
};

/**
 * Retries publishing all packages in a non stable state.
 *
 * Stable states are :
 * - STATES.ERROR
 * - STATES.WAITING_FOR_UPLOAD
 * - STATES.READY
 * - STATES.PUBLISHED
 *
 * @method retryAll
 */
PublishManager.prototype.retryAll = function() {
  var self = this;

  // Retrieve all packages in a non stable state
  this.videoProvider.getAll(
    new ResourceFilter()
    .notIn('state', [
      STATES.ERROR,
      STATES.WAITING_FOR_UPLOAD,
      STATES.READY,
      STATES.PUBLISHED
    ]),
    null,
    {
      id: 'desc'
    },
    function(error, mediaPackages) {
      if (error)
        return self.emit('error', new PublishError('Getting packages in non stable state failed with message : ' +
                                                   error.message,
                                              ERRORS.UNKNOWN));

      mediaPackages.forEach(function(mediaPackage) {
        self.retry(mediaPackage.id, true);
      });

    }
  );

};

/**
 * Uploads a media blocked in "waiting to upload" state.
 *
 * @method upload
 * @param {String} packageId The id of the package waiting to be uploaded
 * @param {String} platform The type of the video platform to upload to
 */
PublishManager.prototype.upload = function(packageId, platform) {
  if (packageId && platform) {
    var self = this;

    // Retrieve package information
    this.videoProvider.getOne(
      new ResourceFilter().equal('id', packageId),
      null,
      function(error, mediaPackage) {
        if (error) {
          self.emit(
            'error',
            new PublishError(
              'Getting package ' + packageId + ' failed with message : ' + error.message,
              ERRORS.UNKNOWN
            )
          );
        } else if (!mediaPackage) {

          // Package does not exist
          self.emit('error', new PublishError('Cannot upload package ' + packageId + ' (not found)',
                                              ERRORS.PACKAGE_NOT_FOUND));

        } else if (mediaPackage.state === STATES.WAITING_FOR_UPLOAD) {

          // Package is indeed waiting for upload
          self.videoProvider.updateState(mediaPackage.id, STATES.PENDING, function() {

            // Upload officially started
            self.emit('upload', mediaPackage);
            self.emit('stateChanged', mediaPackage);

          });
          self.videoProvider.updateType(mediaPackage.id, platform);

          var mediaPackageManager = createMediaPackageManager.call(self, mediaPackage);
          process.logger.info('Force upload package ' + mediaPackage.id);
          mediaPackage.type = platform;
          mediaPackageManager.init(mediaPackage.lastState, mediaPackage.lastTransition);

          // Package can be added to pending packages
          if (addPackage.call(self, mediaPackage))
            mediaPackageManager.executeTransition(mediaPackage.lastTransition);

        }

      }
    );
  }
};