1 module rdkafkad.iodriver0; 2 3 version(Have_vibe_d) 4 { 5 enum bool have_vibed = true; 6 public import vibe.core.task: Task; 7 public import vibe.core.concurrency: receiveCompat, Isolated; 8 public import vibe.core.core: runWorkerTaskH; 9 alias _IODelegate = immutable void delegate(); 10 11 // workaround 12 private auto receiveOnlyCompat(ARG)() 13 { 14 import std.meta: Unqual; 15 import std.concurrency: LinkTerminated, OwnerTerminated, MessageMismatch; 16 import std.variant: Variant; 17 import std.format: format; 18 Unqual!ARG ret; 19 20 receiveCompat( 21 (Isolated!ARG val) { ret = val.extract; }, 22 (LinkTerminated e) { throw e; }, 23 (OwnerTerminated e) { throw e; }, 24 (Variant val) { throw new MessageMismatch(format("Unexpected message type %s, expected %s.", val.type, ARG.stringof)); } 25 ); 26 27 return cast(ARG)ret; 28 } 29 30 class _IODelegateClass 31 { 32 _IODelegate call; 33 34 this(_IODelegate call) nothrow @nogc pure @safe 35 { 36 this.call = call; 37 } 38 39 void opCall() 40 { 41 call(); 42 } 43 } 44 45 __gshared Task _io_task; 46 enum IO(string code) = " 47 { 48 import vibe.core.sync: TaskCondition, TaskMutex; 49 import vibe.core.concurrency: sendCompat, assumeIsolated; 50 import core.time : msecs; 51 auto condition = new TaskCondition(new TaskMutex); 52 _IODelegate _io_delegate_ = () 53 { 54 " ~ code ~ " 55 condition.notify; 56 }; 57 sendCompat(_io_task, assumeIsolated(new _IODelegateClass(_io_delegate_))); 58 condition.mutex.lock; 59 condition.wait; 60 condition.mutex.unlock; 61 }"; 62 63 void _io_handler() 64 { 65 for(;;) 66 { 67 auto call = receiveOnlyCompat!_IODelegateClass; 68 call(); 69 } 70 } 71 } 72 else 73 version(Have_vibe_core) 74 { 75 enum bool have_vibed = true; 76 static assert("rdkafkad: support for vibe-core >=1.0.0 is not implemented yet."); 77 } 78 else 79 { 80 enum bool have_vibed = false; 81 enum IO(string code) = "{" ~ code ~ "}"; 82 }