1 module rdkafkad.iodriver0;
2 
3 version(Have_vibe_d)
4 {
5     enum bool have_vibed = true;
6     public import vibe.core.task: Task;
7     public import vibe.core.concurrency: receiveCompat, Isolated;
8     public import vibe.core.core: runWorkerTaskH;
9     alias _IODelegate = immutable void delegate();
10 
11     // workaround
12     private auto receiveOnlyCompat(ARG)()
13     {
14         import std.meta: Unqual;
15         import std.concurrency: LinkTerminated, OwnerTerminated, MessageMismatch;
16         import std.variant: Variant;
17         import std.format: format;
18         Unqual!ARG ret;
19 
20         receiveCompat(
21             (Isolated!ARG val) { ret = val.extract; },
22             (LinkTerminated e) { throw e; },
23             (OwnerTerminated e) { throw e; },
24             (Variant val) { throw new MessageMismatch(format("Unexpected message type %s, expected %s.", val.type, ARG.stringof)); }
25         );
26 
27         return cast(ARG)ret;
28     }
29 
30     class _IODelegateClass
31     {
32         _IODelegate call;
33 
34         this(_IODelegate call) nothrow @nogc pure @safe
35         {
36             this.call = call;
37         }
38 
39         void opCall()
40         {
41             call();
42         }
43     }
44 
45     __gshared Task _io_task;
46     enum IO(string code) = "
47     {
48         import vibe.core.sync: TaskCondition, TaskMutex;
49         import vibe.core.concurrency: sendCompat, assumeIsolated;
50         import core.time : msecs;
51         auto condition = new TaskCondition(new TaskMutex);
52         _IODelegate _io_delegate_ = ()
53             {
54                 " ~ code ~ "
55                 condition.notify;
56             };
57         sendCompat(_io_task, assumeIsolated(new _IODelegateClass(_io_delegate_)));
58         condition.mutex.lock;
59         condition.wait;
60         condition.mutex.unlock;
61     }";
62 
63     void _io_handler()
64     {
65         for(;;)
66         {
67             auto call = receiveOnlyCompat!_IODelegateClass;
68             call();
69         }
70     }
71 }
72 else
73 version(Have_vibe_core)
74 {
75     enum bool have_vibed = true;
76     static assert("rdkafkad: support for vibe-core >=1.0.0 is not implemented yet.");
77 }
78 else
79 {
80     enum bool have_vibed = false;
81     enum IO(string code) = "{" ~ code ~ "}";
82 }