package de.fzj.unicore.wsrflite.messaging;

import de.fzj.unicore.persist.Persist;
import de.fzj.unicore.persist.PersistenceException;
import de.fzj.unicore.persist.PersistenceFactory;
import de.fzj.unicore.persist.PersistenceProperties;
import de.fzj.unicore.wsrflite.messaging.impl.MessageBean;
import eu.unicore.util.Log;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import org.apache.log4j.Logger;

/* loaded from: input_file:de/fzj/unicore/wsrflite/messaging/MessagingImpl.class */
public class MessagingImpl implements IMessaging {
    private static final Logger logger = Log.getLogger("unicore.wsrflite", MessagingImpl.class);
    private final Persist<MessageBean> store;
    private final HashMap<String, IMessagingProvider> providers;
    private final PersistenceProperties persistenceProperties;

    public MessagingImpl(PersistenceProperties persistenceProperties) throws MessagingException {
        this.persistenceProperties = persistenceProperties;
        try {
            this.store = createStore();
            this.providers = new HashMap<>();
        } catch (PersistenceException e) {
            throw new MessagingException((Throwable) e);
        }
    }

    protected Persist<MessageBean> createStore() throws PersistenceException {
        return PersistenceFactory.get(this.persistenceProperties).getPersist(MessageBean.class);
    }

    public int getStoredMessages() throws PersistenceException {
        return this.store.getRowCount();
    }

    @Override // de.fzj.unicore.wsrflite.messaging.IMessaging
    public void registerProvider(IMessagingProvider iMessagingProvider, String str) {
        if (iMessagingProvider == null || str == null) {
            logger.warn("Null provider or id, continuing.");
        } else if (this.providers.put(str, iMessagingProvider) == null) {
            logger.info("Registered new provider <" + iMessagingProvider.getClass().getName() + "> as <" + str + ">");
        }
    }

    @Override // de.fzj.unicore.wsrflite.messaging.IMessaging
    public boolean hasMessages(String str) {
        try {
            return this.store.getRowCount("destination", str) > 0;
        } catch (PersistenceException e) {
            return false;
        }
    }

    @Override // de.fzj.unicore.wsrflite.messaging.IMessaging
    public PullPoint getPullPoint(final String str) throws MessagingException {
        final ArrayList arrayList = new ArrayList();
        synchronized (this.store) {
            try {
                for (String str2 : this.store.getIDs("destination", str)) {
                    arrayList.add(((MessageBean) this.store.read(str2)).message);
                    this.store.remove(str2);
                }
            } catch (Exception e) {
                throw new MessagingException(e);
            }
        }
        return new PullPoint() { // from class: de.fzj.unicore.wsrflite.messaging.MessagingImpl.1
            Iterator<Message> messageIterator;

            {
                this.messageIterator = arrayList.iterator();
            }

            @Override // de.fzj.unicore.wsrflite.messaging.PullPoint
            public boolean hasNext() {
                return this.messageIterator.hasNext();
            }

            @Override // de.fzj.unicore.wsrflite.messaging.PullPoint
            public Message next() {
                Message next = this.messageIterator.next();
                this.messageIterator.remove();
                return next;
            }

            @Override // de.fzj.unicore.wsrflite.messaging.PullPoint
            public void dispose() {
                synchronized (MessagingImpl.this.store) {
                    while (hasNext()) {
                        Message next = next();
                        try {
                            MessagingImpl.this.store.write(new MessageBean(next.getMessageId(), str, next));
                        } catch (PersistenceException e2) {
                            MessagingImpl.logger.warn("Could not write message.");
                        }
                    }
                }
            }
        };
    }

    @Override // de.fzj.unicore.wsrflite.messaging.IMessaging
    public IMessagingChannel getQueue(String str) throws MessagingException {
        return getChannel(str);
    }

    @Override // de.fzj.unicore.wsrflite.messaging.IMessaging
    public IMessagingChannel getChannel(final String str) throws MessagingException {
        IMessagingProvider iMessagingProvider = this.providers.get(str);
        return iMessagingProvider != null ? iMessagingProvider.getChannel() : new IMessagingChannel() { // from class: de.fzj.unicore.wsrflite.messaging.MessagingImpl.2
            @Override // de.fzj.unicore.wsrflite.messaging.IMessagingChannel
            public void publish(Message message) throws MessagingException {
                try {
                    MessagingImpl.this.store.write(new MessageBean(message.getMessageId(), str, message));
                } catch (Exception e) {
                    throw new MessagingException(e);
                }
            }
        };
    }

    public void cleanup() throws PersistenceException {
        this.store.removeAll();
    }
}
