jobs: Abort when systemd's event loop lost
[src/app-framework-binder.git] / src / jobs.c
index 9c81700..27b7dfb 100644 (file)
 #endif
 
 #include "jobs.h"
-#include "fdev-epoll.h"
 #include "sig-monitor.h"
 #include "verbose.h"
 
+#if defined(REMOVE_SYSTEMD_EVENT)
+#include "fdev-epoll.h"
+#endif
+
 #if 0
 #define _alert_ "do you really want to remove signal monitoring?"
 #define sig_monitor_init_timeouts()  ((void)0)
@@ -92,8 +95,8 @@ struct thread
        struct thread *upper;  /**< upper same thread */
        struct job *job;       /**< currently processed job */
        pthread_t tid;         /**< the thread id */
-       unsigned stop: 1;      /**< stop requested */
-       unsigned waits: 1;     /**< is waiting? */
+       volatile unsigned stop: 1;      /**< stop requested */
+       volatile unsigned waits: 1;     /**< is waiting? */
 };
 
 /**
@@ -132,8 +135,11 @@ static struct job *free_jobs;
 
 /* event loop */
 static struct evloop evloop[1];
+
+#if defined(REMOVE_SYSTEMD_EVENT)
 static struct fdev_epoll *fdevepoll;
 static int waitevt;
+#endif
 
 /**
  * Create a new job with the given parameters
@@ -267,6 +273,7 @@ static void job_cancel(int signum, void *arg)
        job->callback(SIGABRT, job->arg);
 }
 
+#if defined(REMOVE_SYSTEMD_EVENT)
 /**
  * Gets a fdev_epoll item.
  * @return a fdev_epoll or NULL in case of error
@@ -281,6 +288,7 @@ static struct fdev_epoll *get_fdevepoll()
 
        return result;
 }
+#endif
 
 /**
  * Monitored normal callback for events.
@@ -305,7 +313,8 @@ static void evloop_run(int signum, void *arg)
                rc = sd_event_prepare(se);
                if (rc < 0) {
                        errno = -rc;
-                       ERROR("sd_event_prepare returned an error (state: %d): %m", sd_event_get_state(se));
+                       CRITICAL("sd_event_prepare returned an error (state: %d): %m", sd_event_get_state(se));
+                       abort();
                } else {
                        if (rc == 0) {
                                rc = sd_event_wait(se, (uint64_t)(int64_t)-1);
@@ -329,6 +338,7 @@ static void evloop_run(int signum, void *arg)
 }
 
 
+#if defined(REMOVE_SYSTEMD_EVENT)
 /**
  * Monitored normal loop for waiting events.
  * @param signum 0 on normal flow or the number
@@ -343,6 +353,7 @@ static void monitored_wait_and_dispatch(int signum, void *arg)
                fdev_epoll_wait_and_dispatch(fdev_epoll, -1);
        }
 }
+#endif
 
 /**
  * Main processing loop of threads processing jobs.
@@ -355,6 +366,9 @@ static void thread_run(volatile struct thread *me)
 {
        struct thread **prv;
        struct job *job;
+#if !defined(REMOVE_SYSTEMD_EVENT)
+       struct evloop *el;
+#endif
 
        /* initialize description of itself and link it in the list */
        me->tid = pthread_self();
@@ -378,7 +392,7 @@ static void thread_run(volatile struct thread *me)
                }
 
                /* get a job */
-               job = job_get(first_job);
+               job = job_get();
                if (job) {
                        /* prepare running the job */
                        remains++; /* increases count of job that can wait */
@@ -392,6 +406,28 @@ static void thread_run(volatile struct thread *me)
 
                        /* release the run job */
                        job_release(job);
+#if !defined(REMOVE_SYSTEMD_EVENT)
+               } else {
+                       /* no job, check events */
+                       el = &evloop[0];
+                       if (el->sdev && !__atomic_load_n(&el->state, __ATOMIC_RELAXED)) {
+                               /* run the events */
+                               __atomic_store_n(&el->state, EVLOOP_STATE_LOCK|EVLOOP_STATE_RUN|EVLOOP_STATE_WAIT, __ATOMIC_RELAXED);
+                               current_evloop = el;
+                               pthread_mutex_unlock(&mutex);
+                               sig_monitor(0, evloop_run, el);
+                               pthread_mutex_lock(&mutex);
+                       } else {
+                               /* no job and not events */
+                               running--;
+                               if (!running)
+                                       ERROR("Entering job deep sleep! Check your bindings.");
+                               me->waits = 1;
+                               pthread_cond_wait(&cond, &mutex);
+                               me->waits = 0;
+                               running++;
+                       }
+#else
                } else if (waitevt) {
                        /* no job and not events */
                        running--;
@@ -408,6 +444,7 @@ static void thread_run(volatile struct thread *me)
                        sig_monitor(0, monitored_wait_and_dispatch, get_fdevepoll());
                        pthread_mutex_lock(&mutex);
                        waitevt = 0;
+#endif
                }
        }
 
@@ -604,7 +641,7 @@ static int do_sync(
  *                 of interrupted flow, the context 'closure' as given and
  *                 a 'jobloop' reference that must be used when the job is
  *                 terminated to unlock the current execution flow.
- * @param arg the argument to the callback
+ * @param closure the argument to the callback
  * @return 0 on success or -1 in case of error
  */
 int jobs_enter(
@@ -685,12 +722,15 @@ static int on_evloop_efd(sd_event_source *s, int fd, uint32_t revents, void *use
        struct evloop *evloop = userdata;
        read(evloop->efd, &x, sizeof x);
        pthread_mutex_lock(&mutex);
-       pthread_cond_broadcast(&evloop->cond);  
+       pthread_cond_broadcast(&evloop->cond);
        pthread_mutex_unlock(&mutex);
        return 1;
 }
 
 /* temporary hack */
+#if !defined(REMOVE_SYSTEMD_EVENT)
+__attribute__((unused))
+#endif
 static void evloop_callback(void *arg, uint32_t event, struct fdev *fdev)
 {
        sig_monitor(0, evloop_run, arg);
@@ -727,6 +767,15 @@ static struct sd_event *get_sd_event_locked()
                rc = sd_event_add_io(el->sdev, NULL, el->efd, EPOLLIN, on_evloop_efd, el);
                if (rc < 0) {
                        ERROR("can't register eventfd");
+#if !defined(REMOVE_SYSTEMD_EVENT)
+                       sd_event_unref(el->sdev);
+                       el->sdev = NULL;
+error2:
+                       close(el->efd);
+error1:
+                       return NULL;
+               }
+#else
                        goto error3;
                }
                /* handle the event loop */
@@ -744,6 +793,7 @@ error1:
                fdev_set_autoclose(el->fdev, 0);
                fdev_set_events(el->fdev, EPOLLIN);
                fdev_set_callback(el->fdev, evloop_callback, el);
+#endif
        }
 
        /* attach the event loop to the current thread */
@@ -779,6 +829,7 @@ struct sd_event *jobs_get_sd_event()
        return result;
 }
 
+#if defined(REMOVE_SYSTEMD_EVENT)
 /**
  * Gets the fdev_epoll item.
  * @return a fdev_epoll or NULL in case of error
@@ -793,6 +844,7 @@ struct fdev_epoll *jobs_get_fdev_epoll()
 
        return result;
 }
+#endif
 
 /**
  * Enter the jobs processing loop.