prof: fix disgusting synchronzation
authorBarret Rhoden <brho@cs.berkeley.edu>
Sun, 12 May 2019 22:00:43 +0000 (18:00 -0400)
committerBarret Rhoden <brho@cs.berkeley.edu>
Sun, 12 May 2019 22:00:43 +0000 (18:00 -0400)
The profiler's sync code was always an annoyance - the first version was
nasty and I found bugs with it.  The second version (current code) had
layers of krefs and qlocks and whatnot, obfuscating things.  I didn't
see the bug at the time, but syzbot found the bug.  Absence of evidence
is not the evidence of absence.

The root of the issue was that the kref was used to handle the
tear-down, but the open / setup code assumed a close / cleanup was
completed.  If a core kept a kref around between a quick close-open
pair, then we'd crash.  Specifically, this would die:

Core 1 Core 2
-------------------------------------
open
  set ref = 1
start BT sample
kref_get (ref == 2)
close
  decref, ref == 1

open
  panic!  (ref != 0)
eventually decref

Fixing it requires blocking close / cleanup until all of the users are
done (users being the sampler pushers).  That's a great use for RCU.

This commit replaces the kref with RCU, and fixes a bunch of other nasty
things along the way.  To do it properly, I created struct profiler to
encapsulate the qio and the per-core array, and protected that entire
thing with RCU.

This resulted in a lot of cleanups.  Various assertions and checks for
the existence of profiler_queue and the cpu buffer array all disappear.
I also explicitly documented the synchronization rules, and got rid of
the superfluous profiler mutex.

Additionally, a lot of the error handling goes away.  Some of it was due
to completely unnecessary use of waserror() - e.g. catching errors for
functions that don't throw.  There was also an unmatched waserror() in
the old code.  That's all gone.

Other minor fixups:
- kprof couldn't handle an error in profiler_setup().  If it failed,
we'd just say the profiler was opened.
- kmalloc(x, 0) -> kmalloc(x, MEM_ATOMIC)
- WRITE_ONCE() for the unsynched configuration variable writes.  Note
that no one in our codebase even uses those, and their usage is a little
hokey.

Anyway, there might be some bugs here still, but hopefully it's a little
better and more clear.

Reported-by: syzbot+e8998ed113c341947438@syzkaller.appspotmail.com
Signed-off-by: Barret Rhoden <brho@cs.berkeley.edu>
kern/drivers/dev/kprof.c
kern/include/profiler.h
kern/src/profiler.c

index 243f97f..832d2c0 100644 (file)
@@ -160,8 +160,6 @@ static void kprof_flush_profiler(void)
 
 static void kprof_init(void)
 {
-       profiler_init();
-
        qlock_init(&kprof.lock);
        kprof.profiling = FALSE;
        kprof.opened = FALSE;
@@ -225,9 +223,11 @@ static struct chan *kprof_open(struct chan *c, int omode)
                        qunlock(&kprof.lock);
                        error(EBUSY, "Global profiler is already open");
                }
+               if (profiler_setup() < 0) {
+                       qunlock(&kprof.lock);
+                       error(ENOMEM, "failed to set up profiler");
+               }
                kprof.opened = TRUE;
-               /* TODO: have a creation function for a non-global profiler */
-               profiler_setup();
                qunlock(&kprof.lock);
                break;
        }
index 41dc839..6b04210 100644 (file)
@@ -1,32 +1,41 @@
 /* Copyright (c) 2015 Google Inc
  * Davide Libenzi <dlibenzi@google.com>
  * See LICENSE for details.
- */
+ *
+ * Synchronization is a little hokey - we assume the external caller (kprof)
+ * makes sure that there is only one call to profiler_setup(), followed by a
+ * call to profiler_cleanup().  Between these calls, that thread can make
+ * serialized calls to profiler_{start,stop,trace_data_flush}(). */
 
 #pragma once
 
 #include <stdio.h>
 #include <ros/profiler_records.h>
 
-struct hw_trapframe;
 struct proc;
 struct file_or_chan;
 struct cmdbuf;
 
-int profiler_configure(struct cmdbuf *cb);
-void profiler_append_configure_usage(char *msgbuf, size_t buflen);
-void profiler_init(void);
-void profiler_setup(void);
+/* Caller (kprof) ensures at most one call to setup and then cleanup. */
+int profiler_setup(void);
 void profiler_cleanup(void);
+
+/* Call these one at a time after setup and before cleanup. */
 void profiler_start(void);
 void profiler_stop(void);
+void profiler_trace_data_flush(void);
+
+/* Call these anytime.  If the profiler is off, they will be ignored.  Some
+ * configure options won't take effect until the next profiler run. */
+int profiler_configure(struct cmdbuf *cb);
+void profiler_append_configure_usage(char *msgbuf, size_t buflen);
+
 void profiler_push_kernel_backtrace(uintptr_t *pc_list, size_t nr_pcs,
                                     uint64_t info);
 void profiler_push_user_backtrace(uintptr_t *pc_list, size_t nr_pcs,
                                   uint64_t info);
-void profiler_trace_data_flush(void);
-int profiler_size(void);
-int profiler_read(void *va, int n);
+size_t profiler_size(void);
+size_t profiler_read(void *va, size_t n);
 void profiler_notify_mmap(struct proc *p, uintptr_t addr, size_t size, int prot,
                          int flags, struct file_or_chan *foc, size_t offset);
 void profiler_notify_new_process(struct proc *p);
index ede0abc..6103027 100644 (file)
  * profiler_notify_mmap()) just go straight to the central queue.
  *
  * Currently there is one global profiler.  Kprof is careful to only have one
- * open profiler at a time.  We assert that this is true.  TODO: stop using the
- * global profiler!
+ * open profiler at a time.  See profiler.h for more info.  The only sync we do
+ * in this file is for the functions that are not called while holding the kprof
+ * mutex - specifically the RCU-protected backtrace sampling code.
  *
  * A few other notes:
  * - profiler_control_trace() controls the per-core trace collection.  When it
  *   is disabled, it also flushes the per-core blocks to the central queue.
  * - The collection of mmap and comm samples is independent of trace collection.
- *   Those will occur whenever the profiler is open (refcnt check, for now). */
+ *   Those will occur whenever the profiler is open, even if it is not started.
+ * - Looks like we don't bother with munmap records.  Not sure if perf can
+ *   handle it or not. */
 
 #include <ros/common.h>
 #include <ros/mman.h>
@@ -32,7 +35,6 @@
 #include <mm.h>
 #include <kmalloc.h>
 #include <pmap.h>
-#include <kref.h>
 #include <atomic.h>
 #include <umem.h>
 #include <elf.h>
 struct profiler_cpu_context {
        struct block *block;
        int cpu;
-       int tracing;
+       bool tracing;
        size_t dropped_data_cnt;
 };
 
+/* These are a little hokey, and are currently global vars */
 static int profiler_queue_limit = 64 * 1024 * 1024;
 static size_t profiler_cpu_buffer_size = 65536;
-static qlock_t profiler_mtx = QLOCK_INITIALIZER(profiler_mtx);
-static struct kref profiler_kref;
-static struct profiler_cpu_context *profiler_percpu_ctx;
-static struct queue *profiler_queue;
 
-static inline struct profiler_cpu_context *profiler_get_cpu_ctx(int cpu)
+struct profiler {
+       struct profiler_cpu_context *pcpu_ctx;
+       struct queue *qio;
+       bool tracing;
+};
+
+static struct profiler __rcu *gbl_prof;
+
+static struct profiler_cpu_context *profiler_get_cpu_ctx(struct profiler *prof,
+                                                        int cpu)
 {
-       return profiler_percpu_ctx + cpu;
+       return prof->pcpu_ctx + cpu;
 }
 
 static inline char *vb_encode_uint64(char *data, uint64_t n)
@@ -78,14 +86,15 @@ static inline char *vb_encode_uint64(char *data, uint64_t n)
        return data;
 }
 
-static struct block *profiler_buffer_write(struct profiler_cpu_context *cpu_buf,
+static struct block *profiler_buffer_write(struct profiler *prof,
+                                          struct profiler_cpu_context *cpu_buf,
                                            struct block *b)
 {
        /* qpass will drop b if the queue is over its limit.  we're willing to
         * lose traces, but we won't lose 'control' events, such as MMAP and
         * PID. */
        if (b) {
-               if (qpass(profiler_queue, b) < 0)
+               if (qpass(prof->qio, b) < 0)
                        cpu_buf->dropped_data_cnt++;
        }
        return block_alloc(profiler_cpu_buffer_size, MEM_ATOMIC);
@@ -95,13 +104,13 @@ static struct block *profiler_buffer_write(struct profiler_cpu_context *cpu_buf,
  * enough room in the pcpu block for our write.  May alloc a new one.
  *
  * IRQs must be disabled before calling, until after write_commit. */
-static char *profiler_cpu_buffer_write_reserve(
+static char *profiler_cpu_buffer_write_reserve(struct profiler *prof,
        struct profiler_cpu_context *cpu_buf, size_t size, struct block **pb)
 {
        struct block *b = cpu_buf->block;
 
        if (unlikely((!b) || (b->lim - b->wp) < size)) {
-               cpu_buf->block = b = profiler_buffer_write(cpu_buf, b);
+               cpu_buf->block = b = profiler_buffer_write(prof, cpu_buf, b);
                if (unlikely(!b))
                        return NULL;
        }
@@ -124,7 +133,8 @@ static inline size_t profiler_max_envelope_size(void)
        return 2 * VBE_MAX_SIZE(uint64_t);
 }
 
-static void profiler_push_kernel_trace64(struct profiler_cpu_context *cpu_buf,
+static void profiler_push_kernel_trace64(struct profiler *prof,
+                                        struct profiler_cpu_context *cpu_buf,
                                          const uintptr_t *trace, size_t count,
                                          uint64_t info)
 {
@@ -135,7 +145,7 @@ static void profiler_push_kernel_trace64(struct profiler_cpu_context *cpu_buf,
        void *resptr, *ptr;
 
        assert(!irq_is_enabled());
-       resptr = profiler_cpu_buffer_write_reserve(
+       resptr = profiler_cpu_buffer_write_reserve(prof,
            cpu_buf, size + profiler_max_envelope_size(), &b);
        ptr = resptr;
 
@@ -163,7 +173,8 @@ static void profiler_push_kernel_trace64(struct profiler_cpu_context *cpu_buf,
        }
 }
 
-static void profiler_push_user_trace64(struct profiler_cpu_context *cpu_buf,
+static void profiler_push_user_trace64(struct profiler *prof,
+                                      struct profiler_cpu_context *cpu_buf,
                                        struct proc *p, const uintptr_t *trace,
                                        size_t count, uint64_t info)
 {
@@ -173,7 +184,7 @@ static void profiler_push_user_trace64(struct profiler_cpu_context *cpu_buf,
        void *resptr, *ptr;
 
        assert(!irq_is_enabled());
-       resptr = profiler_cpu_buffer_write_reserve(
+       resptr = profiler_cpu_buffer_write_reserve(prof,
            cpu_buf, size + profiler_max_envelope_size(), &b);
        ptr = resptr;
 
@@ -198,12 +209,13 @@ static void profiler_push_user_trace64(struct profiler_cpu_context *cpu_buf,
        }
 }
 
-static void profiler_push_pid_mmap(struct proc *p, uintptr_t addr, size_t msize,
-                                   size_t offset, const char *path)
+static void profiler_push_pid_mmap(struct profiler *prof, struct proc *p,
+                                  uintptr_t addr, size_t msize, size_t offset,
+                                  const char *path)
 {
        size_t plen = strlen(path) + 1;
        size_t size = sizeof(struct proftype_pid_mmap64) + plen;
-       void *resptr = kmalloc(size + profiler_max_envelope_size(), 0);
+       void *resptr = kmalloc(size + profiler_max_envelope_size(), MEM_ATOMIC);
 
        if (likely(resptr)) {
                void *ptr = resptr;
@@ -222,17 +234,17 @@ static void profiler_push_pid_mmap(struct proc *p, uintptr_t addr, size_t msize,
                record->offset = offset;
                memcpy(record->path, path, plen);
 
-               qiwrite(profiler_queue, resptr, (int) (ptr - resptr));
+               qiwrite(prof->qio, resptr, (int) (ptr - resptr));
 
                kfree(resptr);
        }
 }
 
-static void profiler_push_new_process(struct proc *p)
+static void profiler_push_new_process(struct profiler *prof, struct proc *p)
 {
        size_t plen = strlen(p->binary_path) + 1;
        size_t size = sizeof(struct proftype_new_process) + plen;
-       void *resptr = kmalloc(size + profiler_max_envelope_size(), 0);
+       void *resptr = kmalloc(size + profiler_max_envelope_size(), MEM_ATOMIC);
 
        if (likely(resptr)) {
                void *ptr = resptr;
@@ -248,7 +260,7 @@ static void profiler_push_new_process(struct proc *p)
                record->pid = p->pid;
                memcpy(record->path, p->binary_path, plen);
 
-               qiwrite(profiler_queue, resptr, (int) (ptr - resptr));
+               qiwrite(prof->qio, resptr, (int) (ptr - resptr));
 
                kfree(resptr);
        }
@@ -266,65 +278,18 @@ static void profiler_emit_current_system_status(void)
                                     vmr->vm_foff);
        }
 
-       ERRSTACK(1);
        struct process_set pset;
 
        proc_get_set(&pset);
-       if (waserror()) {
-               proc_free_set(&pset);
-               nexterror();
-       }
 
        for (size_t i = 0; i < pset.num_processes; i++) {
                profiler_notify_new_process(pset.procs[i]);
                enumerate_vmrs(pset.procs[i], enum_proc, pset.procs[i]);
        }
 
-       poperror();
        proc_free_set(&pset);
 }
 
-static void free_cpu_buffers(void)
-{
-       kfree(profiler_percpu_ctx);
-       profiler_percpu_ctx = NULL;
-
-       if (profiler_queue) {
-               qfree(profiler_queue);
-               profiler_queue = NULL;
-       }
-}
-
-static void alloc_cpu_buffers(void)
-{
-       ERRSTACK(1);
-
-       /* It is very important that we enqueue and dequeue entire records at
-        * once.  If we leave partial records, the entire stream will be
-        * corrupt.  Our reader does its best to make sure it has room for
-        * complete records (checks qlen()).
-        *
-        * If we ever get corrupt streams, try making this a Qmsg.  Though it
-        * doesn't help every situation - we have issues with writes greater
-        * than Maxatomic regardless. */
-       profiler_queue = qopen(profiler_queue_limit, 0, NULL, NULL);
-       if (!profiler_queue)
-               error(ENOMEM, ERROR_FIXME);
-       if (waserror()) {
-               free_cpu_buffers();
-               nexterror();
-       }
-
-       profiler_percpu_ctx =
-           kzmalloc(sizeof(*profiler_percpu_ctx) * num_cores, MEM_WAIT);
-
-       for (int i = 0; i < num_cores; i++) {
-               struct profiler_cpu_context *b = &profiler_percpu_ctx[i];
-
-               b->cpu = i;
-       }
-}
-
 static long profiler_get_checked_value(const char *value, long k, long minval,
                                        long maxval)
 {
@@ -338,22 +303,33 @@ static long profiler_get_checked_value(const char *value, long k, long minval,
        return lvalue;
 }
 
+/* TODO: This configure stuff is a little hokey.  You have to configure before
+ * it's been opened, meaning before you have the kprofctlqid, but you can't
+ * configure until you have the chan.  To use this, you'd need to open, then
+ * config, then close, then hope that the global settings stick around, then
+ * open and run it.
+ *
+ * Also note that no one uses this. */
 int profiler_configure(struct cmdbuf *cb)
 {
        if (!strcmp(cb->f[0], "prof_qlimit")) {
                if (cb->nf < 2)
                        error(EFAIL, "prof_qlimit KB");
-               if (kref_refcnt(&profiler_kref) > 0)
-                       error(EFAIL, "Profiler already running");
-               profiler_queue_limit = (int) profiler_get_checked_value(
-                       cb->f[1], 1024, 1024 * 1024, max_pmem / 32);
+               /* If the profiler is already running, this won't take effect
+                * until the next open.  Feel free to change this. */
+               WRITE_ONCE(profiler_queue_limit,
+                          profiler_get_checked_value(cb->f[1], 1024,
+                                                     1024 * 1024,
+                                                     max_pmem / 32));
                return 1;
        }
        if (!strcmp(cb->f[0], "prof_cpubufsz")) {
                if (cb->nf < 2)
                        error(EFAIL, "prof_cpubufsz KB");
-               profiler_cpu_buffer_size = (size_t) profiler_get_checked_value(
-                       cb->f[1], 1024, 16 * 1024, 1024 * 1024);
+               WRITE_ONCE(profiler_cpu_buffer_size,
+                          profiler_get_checked_value(cb->f[1], 1024,
+                                                     16 * 1024,
+                                                     1024 * 1024));
                return 1;
        }
 
@@ -373,191 +349,214 @@ void profiler_append_configure_usage(char *msgbuf, size_t buflen)
        }
 }
 
-static void profiler_release(struct kref *kref)
-{
-       bool got_reference = FALSE;
-
-       assert(kref == &profiler_kref);
-       qlock(&profiler_mtx);
-       /* Make sure we did not race with profiler_setup(), that got the
-        * profiler_mtx lock just before us, and re-initialized the profiler
-        * for a new user.
-        * If we race here from another profiler_release() (user did a
-        * profiler_setup() immediately followed by a profiler_cleanup()) we are
-        * fine because free_cpu_buffers() can be called multiple times.
-        */
-       if (!kref_get_not_zero(kref, 1))
-               free_cpu_buffers();
-       else
-               got_reference = TRUE;
-       qunlock(&profiler_mtx);
-       /* We cannot call kref_put() within the profiler_kref lock, as such call
-        * might trigger anohter call to profiler_release().
-        */
-       if (got_reference)
-               kref_put(kref);
-}
-
-void profiler_init(void)
-{
-       assert(kref_refcnt(&profiler_kref) == 0);
-       kref_init(&profiler_kref, profiler_release, 0);
-}
-
-void profiler_setup(void)
+int profiler_setup(void)
 {
-       ERRSTACK(1);
+       struct profiler *prof;
 
-       qlock(&profiler_mtx);
-       if (waserror()) {
-               qunlock(&profiler_mtx);
-               nexterror();
+       assert(!rcu_dereference_check(gbl_prof, true));
+       prof = kzmalloc(sizeof(struct profiler), MEM_WAIT);
+       /* It is very important that we enqueue and dequeue entire records at
+        * once.  If we leave partial records, the entire stream will be
+        * corrupt.  Our reader does its best to make sure it has room for
+        * complete records (checks qlen()).
+        *
+        * If we ever get corrupt streams, try making this a Qmsg.  Though it
+        * doesn't help every situation - we have issues with writes greater
+        * than Maxatomic regardless. */
+       prof->qio = qopen(profiler_queue_limit, 0, NULL, NULL);
+       if (!prof->qio) {
+               kfree(prof);
+               return -1;
        }
-       assert(!profiler_queue);
-       alloc_cpu_buffers();
-
-       /* Do this only when everything is initialized (as last init operation).
-        */
-       __kref_get(&profiler_kref, 1);
+       prof->pcpu_ctx = kzmalloc(sizeof(struct profiler_cpu_context)
+                                 * num_cores, MEM_WAIT);
+       for (int i = 0; i < num_cores; i++) {
+               struct profiler_cpu_context *b = &prof->pcpu_ctx[i];
 
+               b->cpu = i;
+       }
+       rcu_assign_pointer(gbl_prof, prof);
        profiler_emit_current_system_status();
-
-       poperror();
-       qunlock(&profiler_mtx);
+       return 0;
 }
 
 void profiler_cleanup(void)
 {
-       kref_put(&profiler_kref);
+       struct profiler *prof = rcu_dereference_protected(gbl_prof, true);
+
+       RCU_INIT_POINTER(gbl_prof, NULL);
+       synchronize_rcu();
+       kfree(prof->pcpu_ctx);
+       qfree(prof->qio);
+       kfree(prof);
 }
 
-static void profiler_cpu_flush(struct profiler_cpu_context *cpu_buf)
+static void profiler_cpu_flush(struct profiler *prof,
+                              struct profiler_cpu_context *cpu_buf)
 {
        int8_t irq_state = 0;
 
        disable_irqsave(&irq_state);
-       if (cpu_buf->block && profiler_queue) {
-               qibwrite(profiler_queue, cpu_buf->block);
+       if (cpu_buf->block) {
+               qibwrite(prof->qio, cpu_buf->block);
 
                cpu_buf->block = NULL;
        }
        enable_irqsave(&irq_state);
 }
 
-static void profiler_core_trace_enable(void *opaque)
+static void __profiler_core_trace_enable(void *opaque)
 {
-       struct profiler_cpu_context *cpu_buf = profiler_get_cpu_ctx(core_id());
+       struct profiler *prof = opaque;
+       struct profiler_cpu_context *cpu_buf = profiler_get_cpu_ctx(prof,
+                                                                   core_id());
 
-       cpu_buf->tracing = (int) (opaque != NULL);
+       cpu_buf->tracing = prof->tracing;
        if (!cpu_buf->tracing)
-               profiler_cpu_flush(cpu_buf);
+               profiler_cpu_flush(prof, cpu_buf);
 }
 
-static void profiler_control_trace(int onoff)
+static void profiler_control_trace(struct profiler *prof, int onoff)
 {
        struct core_set cset;
 
-       error_assert(EINVAL, profiler_percpu_ctx);
+       assert(prof);
 
        core_set_init(&cset);
        core_set_fill_available(&cset);
-       smp_do_in_cores(&cset, profiler_core_trace_enable,
-                       (void *) (uintptr_t) onoff);
+       prof->tracing = onoff;
+       /* Note this blocks until all cores have run the function. */
+       smp_do_in_cores(&cset, __profiler_core_trace_enable, prof);
 }
 
+/* This must only be called by the Kprofctlqid FD holder, ensuring that the
+ * profiler exists.  Not thread-safe. */
 void profiler_start(void)
 {
-       assert(profiler_queue);
-       profiler_control_trace(1);
-       qreopen(profiler_queue);
+       struct profiler *prof = rcu_dereference_protected(gbl_prof, true);
+
+       profiler_control_trace(prof, 1);
+       qreopen(prof->qio);
 }
 
+/* This must only be called by the Kprofctlqid FD holder, ensuring that the
+ * profiler exists.  Not thread-safe. */
 void profiler_stop(void)
 {
-       assert(profiler_queue);
-       profiler_control_trace(0);
-       qhangup(profiler_queue, 0);
+       struct profiler *prof = rcu_dereference_protected(gbl_prof, true);
+
+       profiler_control_trace(prof, 0);
+       qhangup(prof->qio, 0);
 }
 
-static void profiler_core_flush(void *opaque)
+static void __profiler_core_flush(void *opaque)
 {
-       struct profiler_cpu_context *cpu_buf = profiler_get_cpu_ctx(core_id());
+       struct profiler *prof = opaque;
+       struct profiler_cpu_context *cpu_buf = profiler_get_cpu_ctx(prof,
+                                                                   core_id());
 
-       profiler_cpu_flush(cpu_buf);
+       profiler_cpu_flush(prof, cpu_buf);
 }
 
+/* This must only be called by the Kprofctlqid FD holder, ensuring that the
+ * profiler exists. */
 void profiler_trace_data_flush(void)
 {
        struct core_set cset;
 
-       error_assert(EINVAL, profiler_percpu_ctx);
-
        core_set_init(&cset);
        core_set_fill_available(&cset);
-       smp_do_in_cores(&cset, profiler_core_flush, NULL);
+       smp_do_in_cores(&cset, __profiler_core_flush, NULL);
 }
 
 void profiler_push_kernel_backtrace(uintptr_t *pc_list, size_t nr_pcs,
                                     uint64_t info)
 {
-       if (kref_get_not_zero(&profiler_kref, 1)) {
+       struct profiler *prof;
+
+       rcu_read_lock();
+       prof = rcu_dereference(gbl_prof);
+       if (prof) {
                struct profiler_cpu_context *cpu_buf =
-                       profiler_get_cpu_ctx(core_id());
+                       profiler_get_cpu_ctx(prof, core_id());
 
-               if (profiler_percpu_ctx && cpu_buf->tracing)
-                       profiler_push_kernel_trace64(cpu_buf, pc_list, nr_pcs,
-                                                    info);
-               kref_put(&profiler_kref);
+               if (cpu_buf->tracing)
+                       profiler_push_kernel_trace64(prof, cpu_buf, pc_list,
+                                                    nr_pcs, info);
        }
+       rcu_read_unlock();
 }
 
 void profiler_push_user_backtrace(uintptr_t *pc_list, size_t nr_pcs,
                                   uint64_t info)
 {
-       if (kref_get_not_zero(&profiler_kref, 1)) {
-               struct proc *p = current;
+       struct profiler *prof;
+
+       rcu_read_lock();
+       prof = rcu_dereference(gbl_prof);
+       if (prof) {
                struct profiler_cpu_context *cpu_buf =
-                       profiler_get_cpu_ctx(core_id());
+                       profiler_get_cpu_ctx(prof, core_id());
 
-               if (profiler_percpu_ctx && cpu_buf->tracing)
-                       profiler_push_user_trace64(cpu_buf, p, pc_list, nr_pcs,
-                                                  info);
-               kref_put(&profiler_kref);
+               if (cpu_buf->tracing)
+                       profiler_push_user_trace64(prof, cpu_buf, current,
+                                                  pc_list, nr_pcs, info);
        }
+       rcu_read_unlock();
 }
 
-int profiler_size(void)
+size_t profiler_size(void)
 {
-       return profiler_queue ? qlen(profiler_queue) : 0;
+       struct profiler *prof;
+       size_t ret;
+
+       rcu_read_lock();
+       prof = rcu_dereference(gbl_prof);
+       ret = prof ? qlen(prof->qio) : 0;
+       rcu_read_unlock();
+       return ret;
 }
 
-int profiler_read(void *va, int n)
+size_t profiler_read(void *va, size_t n)
 {
-       return profiler_queue ? qread(profiler_queue, va, n) : 0;
+       struct profiler *prof;
+       size_t ret;
+
+       rcu_read_lock();
+       prof = rcu_dereference(gbl_prof);
+       ret = prof ? qread(prof->qio, va, n) : 0;
+       rcu_read_unlock();
+       return ret;
 }
 
 void profiler_notify_mmap(struct proc *p, uintptr_t addr, size_t size, int prot,
                           int flags, struct file_or_chan *foc, size_t offset)
 {
-       if (kref_get_not_zero(&profiler_kref, 1)) {
-               if (foc && (prot & PROT_EXEC) && profiler_percpu_ctx) {
+       struct profiler *prof;
+
+       rcu_read_lock();
+       prof = rcu_dereference(gbl_prof);
+       if (prof) {
+               if (foc && (prot & PROT_EXEC)) {
                        char path_buf[PROFILER_MAX_PRG_PATH];
                        char *path = foc_abs_path(foc, path_buf,
                                                  sizeof(path_buf));
 
                        if (likely(path))
-                               profiler_push_pid_mmap(p, addr, size, offset,
-                                                      path);
+                               profiler_push_pid_mmap(prof, p, addr, size,
+                                                      offset, path);
                }
-               kref_put(&profiler_kref);
        }
+       rcu_read_unlock();
 }
 
 void profiler_notify_new_process(struct proc *p)
 {
-       if (kref_get_not_zero(&profiler_kref, 1)) {
-               if (profiler_percpu_ctx && p->binary_path)
-                       profiler_push_new_process(p);
-               kref_put(&profiler_kref);
-       }
+       struct profiler *prof;
+
+       rcu_read_lock();
+       prof = rcu_dereference(gbl_prof);
+       if (prof && p->binary_path)
+               profiler_push_new_process(prof, p);
+       rcu_read_unlock();
 }