java.util.stream con ResultSet

Ho pochi tavoli con grandi quantità di dati (circa 100 milioni di record). Quindi non posso memorizzare questi dati in memoria, ma vorrei flusso di questo set di risultati utilizzando java.util.stream di classe e di passare questo flusso di un’altra classe. Ho letto su Stream.of e Stream.Builder operatori, ma essi sono memorizzati i flussi in memoria. C’è qualche modo per risolvere questo problema?
Grazie in anticipo.

AGGIORNAMENTO #1

Ok ho cercato su google e trovato jooq biblioteca. Non sono sicuro, ma sembra che potrebbe essere applicabile al mio caso test. Per riassumere ho pochi tavoli con grandi quantità di dati. Vorrei stream il mio gruppo di risultati e di trasferimento di questo flusso a un altro metodo. Qualcosa di simile a questo:

//why return Stream<String>? Because my result set has String type
private Stream<Record> writeTableToStream(DataSource dataSource, String table) {

    Stream<Record> record = null;
    try (Connection connection = dataSource.getConnection()) {
        String sql = "select * from " + table;

        try (PreparedStatement pSt = connection.prepareStatement(sql)) {
            connection.setAutoCommit(false);
            pSt.setFetchSize(5000);
            ResultSet resultSet = pSt.executeQuery();
            //
            record = DSL.using(connection)
                    .fetch(resultSet).stream();
        }
    } catch (SQLException sqlEx) {
        logger.error(sqlEx);
    }

    return record;
}

Potrebbe piacere a qualcuno consigliare, io sono in modo corretto? Grazie.

UPDATE #2

Ho fatto qualche esperimento su jooq e potrebbe dire ora che la decisione di cui sopra non è adatto per me. Questo codice record = DSL.using(connection).fetch(resultSet).stream(); prende troppo tempo

  • Attenzione che non tutti i flussi sono lo stesso tipo di flusso. java.util.stream.Stream potrebbe non essere adatto per quello che hai in mente.
  • ResultSet è come un ruscello. È possibile elaborare solo una riga del risultato in una sola volta. O vuoi elaborare il ResultSet con le api?
  • Vorrei avvolgere ResultSet a java 8 stream e passare questo stream oggetto di un’altra classe. In un’altra classe, vorrei iterare questo stream e scrivere i risultati in File.
  • Questo è molto vicino ad alcuni dei miei lavori che ho descritto in qui, forse lo troverete utile.
InformationsquelleAutor Iurii | 2015-08-25

 

4 Replies
  1. 64

    La prima cosa che dovete capire è che il codice come

    try (Connection connection = dataSource.getConnection()) {
        
        try (PreparedStatement pSt = connection.prepareStatement(sql)) {
            
            return stream;
        }
    }

    non funziona come da tempo di lasciare il try blocchi, le risorse sono chiusi, mentre il trattamento dei Stream non è nemmeno cominciato.

    La gestione delle risorse costrutto di “provare con risorse” opere per le risorse utilizzate all’interno di un ambito blocco all’interno di un metodo, ma si sta creando un metodo factory restituzione di una risorsa. Pertanto, è necessario assicurarsi che la chiusura di ritorno di un flusso di chiudere le risorse e il chiamante è responsabile per la chiusura del Stream.


    Inoltre, avrete bisogno di una funzione che produce un elemento di una singola linea da ResultSet. Supponendo che si dispone di un metodo come

    Record createRecord(ResultSet rs) {
        
    }

    si può creare un Stream<Record> fondamentalmente come

    Stream<Record> stream = StreamSupport.stream(new Spliterators.AbstractSpliterator<Record>(
        Long.MAX_VALUE,Spliterator.ORDERED) {
            @Override
            public boolean tryAdvance(Consumer<? super Record> action) {
                if(!resultSet.next()) return false;
                action.accept(createRecord(resultSet));
                return true;
            }
        }, false);

    Ma per farlo correttamente, è necessario integrare la gestione delle eccezioni e la chiusura di risorse. È possibile utilizzare Stream.onClose per registrare un’azione che verrà eseguita quando il Stream viene chiuso, ma deve essere un Runnable che non può lanciare eccezioni selezionate. Allo stesso modo il tryAdvance metodo non è permesso di gettare controllato eccezioni. E dato che non possiamo semplicemente nido try(…) blocchi di qui, la logica del programma di soppressione delle eccezioni gettato in close, quando c’è già in attesa eccezione, non è gratis.

    Per aiutare noi, qui, abbiamo introdotto un nuovo tipo di che può avvolgere le operazioni di chiusura che può lanciare eccezioni selezionate e consegnarli avvolto in una eccezione unchecked. Mediante l’attuazione di AutoCloseable stesso, è possibile utilizzare il try(…) costruire una catena di chiudere le operazioni in sicurezza:

    interface UncheckedCloseable extends Runnable, AutoCloseable {
        default void run() {
            try { close(); } catch(Exception ex) { throw new RuntimeException(ex); }
        }
        static UncheckedCloseable wrap(AutoCloseable c) {
            return c::close;
        }
        default UncheckedCloseable nest(AutoCloseable c) {
            return ()->{ try(UncheckedCloseable c1=this) { c.close(); } };
        }
    }

    Con questo, l’intera operazione diventa:

    private Stream<Record> tableAsStream(DataSource dataSource, String table)
        throws SQLException {
    
        UncheckedCloseable close=null;
        try {
            Connection connection = dataSource.getConnection();
            close=UncheckedCloseable.wrap(connection);
            String sql = "select * from " + table;
            PreparedStatement pSt = connection.prepareStatement(sql);
            close=close.nest(pSt);
            connection.setAutoCommit(false);
            pSt.setFetchSize(5000);
            ResultSet resultSet = pSt.executeQuery();
            close=close.nest(resultSet);
            return StreamSupport.stream(new Spliterators.AbstractSpliterator<Record>(
                Long.MAX_VALUE,Spliterator.ORDERED) {
                @Override
                public boolean tryAdvance(Consumer<? super Record> action) {
                    try {
                        if(!resultSet.next()) return false;
                        action.accept(createRecord(resultSet));
                        return true;
                    } catch(SQLException ex) {
                        throw new RuntimeException(ex);
                    }
                }
            }, false).onClose(close);
        } catch(SQLException sqlEx) {
            if(close!=null)
                try { close.close(); } catch(Exception ex) { sqlEx.addSuppressed(ex); }
            throw sqlEx;
        }
    }

    Questo metodo include il necessario chiudere l’operazione per tutte le risorse, Connection, Statement e ResultSet all’interno di un’istanza della classe di utilità descritto sopra. Se un’eccezione si verifica durante l’inizializzazione, l’operazione di chiusura è effettuata immediatamente e l’eccezione viene consegnato al chiamante. Se il flusso di costruzione di esito positivo, l’operazione di chiusura è registrato tramite onClose.

    Quindi il chiamante deve garantire la corretta chiusura come

    try(Stream<Record> s=tableAsStream(dataSource, table)) {
        //stream operation
    }

    Nota che anche la consegna di un SQLException via RuntimeException è stato aggiunto al tryAdvance metodo. Pertanto, è ora possibile aggiungere throws SQLException per il createRecord metodo senza problemi.

    • votare prima. Penso stream.onClose(UncheckedCloseable.wrap(resource)::close) è più espressività e RuntimeException può essere sostituito con UncheckedIOException. voi cosa ne pensate?
    • sarebbe di nuovo riferimento al metodo che può lanciare eccezioni selezionate, quindi, quello che tu proponi sarebbe rifiutata dal compilatore. Oltre a questo, non credo che è necessario per rendere esplicito che il passaggio di una risorsa onClose invocherà il suo close metodo. UncheckedIOException è adatto per il confezionamento di un IOException, ma non per una soluzione generale, non in particolare in questa risposta, dove abbiamo a che fare con SQLException.
    • Ciao, sono d’accordo sul punto di UncheckedIOException, il ::close sarebbe di nuovo gettare cehcked eccezioni possono essere risolti come run metodo e posso rimuovere il run metodo. sono di destra?
    • potrebbe si può ri-frase? Non ho ricevuto la tua lettera e ho voluto rispondere al tuo commento)…
    • Ciao, quello che ho detto a @Holger è quello di dichiarare una close metodo non getta Exception, e quindi delegare close originale AutoCloseable.close come run metodo. quindi posso togliere il run metodo e fare UncheckedCloseable non è un Runnable e quindi posso usare l’espressione stream.onClose(UncheckedCloseable.wrap(resource)::close) . close metodo UncheckedAutoCloseable sono in conflitto con il suo nome, perché il UncheckedAutoCloseable ancora butta Exception. sono di destra?
    • in primo luogo, il metodo potrebbe avere un nome diverso, altri poi close; perché sarebbe ignorare il AutoCloseable::close, diciamo closeMine. Qualcosa come : default void closeMine() {... // code just like in Runnable. In secondo luogo, che avrebbe continuato a lavorare, ma trovo l’Eseguibile approccio molto più pulito. In realtà è un bel pezzo di codice, che ho già preso la mia personale Utils… ci sono un paio di loro da Holger a questo punto
    • Ho detto più di una frase: “mi dispiace per il mio cattivo inglese.” e ripeto, mi dispiace molto. Non ho bisogno di un metodo predefinito closeMine solo a decorare l’originale AutoCloseable. non è forse un modo più pulito, ma è elegante e close metodo che non entrano in conflitto con il suo nome.
    • l’attuazione di simile a questo: interface UncheckedCloseable extends AutoCloseable { void close(); static AutoCloseable wrap(AutoCloseable it) { return (UncheckedCloseable) () -> { try { it.close(); } catch (Exception e) { throw new RuntimeException(e); } }; } }
    • Ciao, c’è un altro motivo quando si utilizza stream.onClose(UncheckedCloseable.wrap(resource)::close) nel frammento di codice, penso che sia più chiara di @Holger, perché posso vedere il codice che è vicino a una risorsa, dopo chiudere il flusso, ma come @Holger ho bisogno di saltare in UncheckedCloseable e vedere che cosa il run metodo. mi potete dare qualche consiglio?
    • la ragione, perché questa classe implementa AutoCloseable è il nest metodo che consente di combinare con un’altra risorsa che ha bisogno di chiusura. Se UncheckedCloseable avuto un close metodo che non ammette eccezioni selezionate, l’attuazione di nest sarebbe più complicato. D’altra parte, se non si utilizza questo metodo, non c’è bisogno di attuare AutoCloseable a tutti. Infatti, non avrebbe bisogno di quel tipo; un sole static avvolgimento metodo sarebbe sufficiente.
    • A proposito, tutti che UncheckedCloseable.wrap(resource)::close ti dice, è che la funzione viene chiamata close su qualsiasi wrap restituisce. Non dice cosa succede a resource. D’altra parte, una volta che hai capito che resource ha anche un close metodo e che, naturalmente, che close metodo alla fine sarà chiamato, beh, allora non hai bisogno di ::close sul risultato di wrap accennare al fatto che. Come si è detto, che close metodo è del tutto estraneo al close metodo di resource. Non sarebbe diverso avessi scritto UncheckedCloseable.wrap(resource)::run
    • Ciao, Un paio di giorni andati, Come stai? “non c’è bisogno di attuare AutoCloseable a tutti” ho ancora bisogno di un UncheckedCloseable a causa AutoCloseable.close genera un’ Exception ma Runnable non fa. e il nest metodo che posso fare in questo modo: try {wrap(c).close();} finally {close();}. Sono di destra, sir?
    • dal UncheckedCloseable.wrap(resource)::run modo ho ancora bisogno di vedere ciò che il run metodi, ma nel UncheckedCloseable.wrap(resource)::close modo che posso vedere è chiamare il close metodo a causa della wrap metodo è diventato un modello/convenzione sviluppatori di mente. Voi cosa ne pensate?
    • se si lascia che UncheckedCloseable dichiarare una close metodo che non passi controllato eccezioni e passare un metodo di riferimento per onClose, non vi è alcuna rilevanza nell’attuazione AutoCloseable più. Nessuno si aspetta un AutoCloseable ovunque. Per quanto riguarda finally, basta leggere perché “provare con la risorsa” è stato introdotto, semplicemente detto, non finally non è un’opzione. Oh, e non capisco perché devi cercare ciò che run ma non dare per scontato di sapere cosa close fa a causa di un “modello/convenzione”. Tutti gli altri sviluppatori già capire cosa UncheckedCloseable.wrap(resource) non…
    • sì, hai ragione. try-with-resource blocco ha un comportamento diverso tra try-finally blocco quando non riesce, sia sul chiude una risorsa, & non riesce a try blocco. quando vedo il tuo codice, per il momento, di UncheckedCloseable ho davvero bisogno di cercare la run metodo, ma non ho bisogno di guardare in alto è repeatly alla prossima volta. grande e finalmente mi ha convinto, vi ringrazio molto.
    • Ho fatto una semplice libreria per fare proprio questo. È progettato per essere thread-safe (flussi paralleli) e si può anche lasciare che le risorse saranno cancellati automaticamente. Richiede Java 10. github.com/claudemartin/streamed-sql
    • non hai bisogno di inserire un synchronized in tryAdvance metodo. Il codice è già utilizzabile con flussi paralleli come è. Il contratto di Spliterator interfaccia che una singola istanza non è mai accessibile contemporaneamente, proprio come con un Iterator. Il punto chiave per l’elaborazione parallela è che un nuovo Spliterator viene creata un’istanza di via trySplit, di essere trattati da altri thread. Dal momento che un singolo ResultSet non può essere diviso, stai benissimo con il ereditata trySplit attuazione che verrà buffer qualche elemento in un array e restituisce un array spliterator per loro.
    • Ma che cosa circa la visibilità? Non credo jdbc è thread-safe. Voglio che ogni thread per vedere qualsiasi l’ultimo rs.next() fatto. Ho potuto ignorare trySplit in modo che solo uno synch-block è necessario per un batch.
    • il ResultSet è mai visto da thread diversi. Gli altri thread vedere che cosa è la funzione di mappatura è tornato per una determinata riga. Questi oggetti sono già stati pubblicati in un thread modo sicuro dal Flusso di attuazione. Questo presuppone che gli oggetti creati per le diverse righe non condividono stato mutabile, ma che è sempre richiesto, cioè synchronized non aiuterebbe, se violati.
    • Sì, hai ragione. È tutto locale. Appena riesco a rimuoverlo. Io aggiungo la documentazione di ResultSetMapper che il ResultSet non deve essere condiviso con un altro codice.
    • Questo potrebbe essere resa più semplice dal fatto di avere il resultset essere autoclosed da provare-con-le risorse e l’utilizzo di espressioni lambda per il consumatore azione?
    • sì, ma questo sarebbe un codice diverso struttura di un Stream di ritorno del metodo. Entrambe le varianti esistenti, ad esempio questa risposta è come, ad esempio,Files.lines​(…), opere, considerando che il tuo suggerimento è come JDK 9+ StackWalker.walk(…) opere.
    • o sostituire la iniziale UncheckedCloseable close=null; con UncheckedCloseable close=() -> {};; ci sono un sacco di possibilità. Questo codice è più di uno schizzo di finale pronto per la produzione di codice. Per quanto ne so, soluzioni simili trovato la loro strada nelle librerie, in modo da utilizzare queste librerie è il modo preferito di allora.
    • Grazie, ancora in fase di sperimentazione con questo; non mi piace la riassegnazione di close ogni volta

  2. 10

    jOOQ

    Ho intenzione di rispondere alla jOOQ parte della tua domanda. Come di jOOQ 3.8, ci sono stati alcuni ulteriori funzioni relative alla combinazione di jOOQ con Stream. Altri usi sono anche documentato su questo jOOQ pagina.

    Il vostro uso suggerito:

    Hai provato questo:

    Stream<Record> stream = DSL.using(connection).fetch(resultSet).stream();

    Infatti, questo non funziona bene per grandi insiemi di risultati perché fetch(ResultSet) recupera l’intero set di dati in memoria e quindi chiama Collezione.stream() su di esso.

    Meglio (pigro) utilizzo:

    Invece, si potrebbe scrivere:

    try (Stream<Record> stream = DSL.using(connection).fetchStream(resultSet)) {
        ...
    }

    … che è essenzialmente la convenienza per questo:

    try (Cursor<Record> cursor = DSL.using(connection).fetchLazy(resultSet)) {
        Stream<Record> stream = cursor.stream();
        ...
    }

    Vedere anche DSLContext.fetchStream(ResultSet)

    Naturalmente, si potrebbe anche lasciare che jOOQ eseguire la stringa SQL, piuttosto che di wrestling con JDBC:

    try (Stream<Record> stream = 
         DSL.using(dataSource)
            .resultQuery("select * from {0}", DSL.name(table)) //Prevent SQL injection
            .fetchSize(5000)
            .fetchStream()) {
        ...
    }

    A provare-con-utilizzo delle risorse

    Fare notare che un Stream prodotto da jOOQ è “pieno di risorse”, cioè contiene un riferimento a un ResultSet (e PreparedStatement). Quindi, se si vuole veramente tornare in streaming al di fuori del metodo, assicurarsi che sia chiuso correttamente!

  3. 4

    Io non sono a conoscenza di un qualsiasi libreria che farà per voi.

    Che ha detto, questo articolo mostra come avvolgere i risultati con un Iteratore (ResultSetIterator) e farla passare come primo parametro per Spliterators.spliteratorUnknownSize() al fine di creare un Spliterator.

    Il Spliterator può quindi essere utilizzato da StreamSupport al fine di creare un Flusso su di esso.

    Le loro proposte di attuazione di ResultSetIterator classe:

    public class ResultSetIterator implements Iterator {
    
        private ResultSet rs;
        private PreparedStatement ps;
        private Connection connection;
        private String sql;
    
        public ResultSetIterator(Connection connection, String sql) {
            assert connection != null;
            assert sql != null;
            this.connection = connection;
            this.sql = sql;
        }
    
        public void init() {
            try {
                ps = connection.prepareStatement(sql);
                rs = ps.executeQuery();
    
            } catch (SQLException e) {
                close();
                throw new DataAccessException(e);
            }
        }
    
        @Override
        public boolean hasNext() {
            if (ps == null) {
                init();
            }
            try {
                boolean hasMore = rs.next();
                if (!hasMore) {
                    close();
                }
                return hasMore;
            } catch (SQLException e) {
                close();
                throw new DataAccessException(e);
            }
    
        }
    
        private void close() {
            try {
                rs.close();
                try {
                    ps.close();
                } catch (SQLException e) {
                    //nothing we can do here
                }
            } catch (SQLException e) {
                //nothing we can do here
            }
        }
    
        @Override
        public Tuple next() {
            try {
                return SQL.rowAsTuple(sql, rs);
            } catch (DataAccessException e) {
                close();
                throw e;
            }
        }
    }

    e poi:

    public static Stream stream(final Connection connection, 
                                           final String sql, 
                                           final Object... parms) {
      return StreamSupport
                    .stream(Spliterators.spliteratorUnknownSize(
                            new ResultSetIterator(connection, sql), 0), false);
    }
    • Nota che per il corto-circuito di flusso di funzionamento dell’iteratore può essere abbandonata in qualsiasi momento di lasciare l’-chiuso ResultSet. È meglio creare un richiudibili Streaming e richiedono per chiudere esplicitamente dopo l’operazione. Anche perché le materie di tipi in Java-8?
    • come si crea un richiudibili streaming e richiedono per chiuderlo ?
    • Ogni flusso può essere chiusa (come AutoCloseable), ma di default non fa nulla. È possibile aggiungere un vicino gestore come StreamSupport.stream(...).onClose(myIterator::close) (memorizzare il ResultSetIterator in myIterator variabile). È possibile richiedere per chiudere scrivendo il corretto JavaDoc come viene fatto per Files.lines metodo (Se tempestivo smaltimento dei file di risorse di sistema è necessario, provare-con-le risorse di costruire deve essere utilizzato bla bla).
    • grazie!!! Avevo paura che questo giorno passerà senza di me apprendimento di qualcosa, ma ora è venuto… :))) grazie ancora!
    • Prima di tutto, non si dovrebbe usare il raw tipi. Secondo, il Iterator è rotto, come hasNext() ha un inaspettato effetto collaterale come sarà passare alla riga successiva. Questo non è un problema teorico.. Si noti che è possibile risolvere il problema e la metà della dimensione del codice, attraverso l’implementazione di un Spliterator. Infine inutilizzato varargs parametro parms è in cerca di guai.
    • implica che la query può includere parametri (PreparedStatement), questa parte non è stata attuata, perché questo è un esempio che significava per uno scopo diverso, ma potrebbe essere facilmente esteso. Lo stesso vale per il hasNext(), questa implementazione ha il contratto di chiamare hasNext() prima di chiamare next() (che è il modo in cui è di solito fatto) – potrebbe essere cambiato, ma non è questo il punto (e lo stesso vale per le materie di tipi).
    • Non c’è nessun contratto che hasNext() e next() sono associati e ho già collegato a una domanda che mostra il Streams —e si sta creando un flusso di iteratore— non call hasNext più di una volta di tanto in tanto. Non è possibile fare il vostro proprio contratto e dichiarare che il flusso di API ha aderirvi. Come comprovata, che non funziona.
    • questo problema può essere risolto, ma rendere il codice più complesso (mantenendo la rs in un membro della classe a + di un contatore per il numero di chiamate a next()), ma si creerà un extra standard, che batte lo scopo di esempio. Punto di presa, però, grazie.
    • Come già detto, si può risolvere facendo il codice semplice attraverso l’implementazione di un Spliterator invece di un Iterator.
    • per favore, sentitevi liberi di modificare la risposta!

  4. 3

    Qui è il modo più semplice di esempio da AbacusUtil.

    final DataSource ds = JdbcUtil.createDataSource(url, user, password);
    final SQLExecutor sqlExecutor = new SQLExecutor(ds);
    sqlExecutor.stream(sql, parameters);

    Divulgazione: io sono lo sviluppatore di AbacusUtil.

    • Dopo una rapida occhiata al AbacusUtil a me sembra che questa è una gigantesca libreria che vorrei essere molto riluttanti a includere in una soluzione. Si potrebbe desiderare di suddividerlo in moduli più piccoli, dove ho potuto scegliere solo ciò di cui ho realmente bisogno?

Lascia un commento